Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLExecuteQueryOperator AttributeError exception when returning result to XCom #31080

Closed
1 of 2 tasks
Stormhand opened this issue May 5, 2023 · 4 comments · Fixed by #31136
Closed
1 of 2 tasks

SQLExecuteQueryOperator AttributeError exception when returning result to XCom #31080

Stormhand opened this issue May 5, 2023 · 4 comments · Fixed by #31136
Labels
area:core kind:bug This is a clearly a bug

Comments

@Stormhand
Copy link

Apache Airflow version

2.6.0

What happened

I am using DatabricksSqlOperator which writes the result to a file. When the task finishes it writes all the data correctly to the file the throws the following exception:

[2023-05-05, 07:56:22 UTC] {taskinstance.py:1847} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 73, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2377, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 73, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom.py", line 237, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom.py", line 632, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/local/lib/python3.9/json/init.py", line 234, in dumps
return cls(
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/json.py", line 102, in encode
o = self.default(o)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/json.py", line 91, in default
return serialize(o)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 144, in serialize
return encode(classname, version, serialize(data, depth + 1))
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 123, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 123, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 123, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 123, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serde.py", line 132, in serialize
qn = qualname(o)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/module_loading.py", line 47, in qualname
return f"{o.module}.{o.name}"
File "/home/airflow/.local/lib/python3.9/site-packages/databricks/sql/types.py", line 161, in getattr
raise AttributeError(item)
AttributeError: name

I found that SQLExecuteQueryOperator always return the result(so pushing XCom) from its execute() method except when the parameter do_xcom_push is set to False. But if do_xcom_push is False then the method _process_output() is not executed and DatabricksSqlOperator wont write the results to a file.

What you think should happen instead

I am not sure if the problem should be fixed in DatabricksSqlOperator or in SQLExecuteQueryOperator. In any case setting do_xcom_push shouldn't automatically prevent the exevution of _process_output():

        if not self.do_xcom_push:
            return None
        if return_single_query_results(self.sql, self.return_last, self.split_statements):
            # For simplicity, we pass always list as input to _process_output, regardless if
            # single query results are going to be returned, and we return the first element
            # of the list in this case from the (always) list returned by _process_output
            return self._process_output([output], hook.descriptions)[-1]
        return self._process_output(output, hook.descriptions)

What happens now is - i have in the same time big result in a file AND in the XCom.

How to reproduce

I suspect that the actual Exception is related to writing the XCom to the meta database and it might not fail on other scenarios.

Operating System

Debian GNU/Linux 11 (bullseye) docker image

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.0.0
apache-airflow-providers-apache-spark==4.0.1
apache-airflow-providers-celery==3.1.0
apache-airflow-providers-cncf-kubernetes==6.1.0
apache-airflow-providers-common-sql==1.4.0
apache-airflow-providers-databricks==4.1.0
apache-airflow-providers-docker==3.6.0
apache-airflow-providers-elasticsearch==4.4.0
apache-airflow-providers-ftp==3.3.1
apache-airflow-providers-google==10.0.0
apache-airflow-providers-grpc==3.1.0
apache-airflow-providers-hashicorp==3.3.1
apache-airflow-providers-http==4.3.0
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-microsoft-azure==6.0.0
apache-airflow-providers-microsoft-mssql==3.3.2
apache-airflow-providers-mysql==5.0.0
apache-airflow-providers-odbc==3.2.1
apache-airflow-providers-oracle==3.6.0
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-redis==3.1.0
apache-airflow-providers-samba==4.1.0
apache-airflow-providers-sendgrid==3.1.0
apache-airflow-providers-sftp==4.2.4
apache-airflow-providers-slack==7.2.0
apache-airflow-providers-snowflake==4.0.5
apache-airflow-providers-sqlite==3.3.2
apache-airflow-providers-ssh==3.6.0
apache-airflow-providers-telegram==4.0.0

Deployment

Docker-Compose

Deployment details

Using extended Airflow image, LocalExecutor, Postgres 13 meta db as container in the same stack.
docker-compose version 1.29.2, build 5becea4c
Docker version 23.0.5, build bc4487a

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Stormhand Stormhand added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 5, 2023
@Stormhand Stormhand changed the title SQLExecuteQueryOperator AttributeError exception when returning result to XCome SQLExecuteQueryOperator AttributeError exception when returning result to XCom May 5, 2023
@potiuk
Copy link
Member

potiuk commented May 8, 2023

Interesting one. Proposed a fix in #31136 @Stormhand

@potiuk potiuk removed the needs-triage label for new issues that we didn't triage yet label May 8, 2023
potiuk added a commit to potiuk/airflow that referenced this issue May 9, 2023
The change adds conditional processing of output based on
criteria that can be overridden by the operator extending the
common.sql BaseSQLOperator. Originally, output processing has only
been happening if "do_xcom_push" was enabled, but in some cases
we want to run processing also when do_xcom_push is disabled
(for example in case of databricks SQL operator, it might be
done when the output is redirected to a file).

This change enables it.

Fixes: apache#31080
potiuk added a commit that referenced this issue May 9, 2023
The change adds conditional processing of output based on
criteria that can be overridden by the operator extending the
common.sql BaseSQLOperator. Originally, output processing has only
been happening if "do_xcom_push" was enabled, but in some cases
we want to run processing also when do_xcom_push is disabled
(for example in case of databricks SQL operator, it might be
done when the output is redirected to a file).

This change enables it.

Fixes: #31080
@Stormhand
Copy link
Author

Stormhand commented Jun 6, 2023

Hi @potiuk , unfortunately it happened again. This time i need do_xcom_push:

[2023-06-06, 08:52:24 UTC] {sql.py:375} INFO - Running statement: SELECT cast(max(id) as STRING) FROM prod.unified.sessions, parameters: None
[2023-06-06, 08:52:25 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2354, in xcom_push
    XCom.set(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 237, in set
    value = cls.serialize_value(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 632, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 102, in encode
    o = self.default(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 91, in default
    return serialize(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 144, in serialize
    return encode(classname, version, serialize(data, depth + 1))
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 123, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 132, in serialize
    qn = qualname(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/module_loading.py", line 47, in qualname
    return f"{o.__module__}.{o.__name__}"
  File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/types.py", line 161, in __getattr__
    raise AttributeError(item)
AttributeError: __name__. Did you mean: '__ne__'?

This is how i use it:

    get_max_id_task = DatabricksSqlOperator(
        databricks_conn_id=databricks_sql_conn_id,
        sql_endpoint_name='sql_endpoint',
        task_id='get_max_id',
        sql="SELECT cast(max(id) as STRING) FROM prod.unified.sessions",
        do_xcom_push=True
    )

Databricks providers, i use the latest:

apache-airflow-providers-common-sql==1.5.1
databricks-sql-connector==2.5.2
apache-airflow-providers-databricks==4.2.0

Airflow is 2.6.1/python 3.10

UPDATE:
Replacing the Databricks SQL Operator with simple PythonOperator and Databricks Sql Hook works just fine:

def get_max_id(ti):
    hook = DatabricksSqlHook(databricks_conn_id=databricks_sql_conn_id, sql_endpoint_name='sql_endpoint')
    sql = "SELECT cast(max(id) as STRING) FROM prod.unified.sessions"
    return str(hook.get_first(sql)[0])

@potiuk
Copy link
Member

potiuk commented Jun 6, 2023

Can you please open a new issue please @Stormhand - this is quite bit different issue with similar stacktrace (cc: @alexott - maybe you can take a look and implement a fix for that one.

@Stormhand
Copy link
Author

Stormhand commented Jun 7, 2023

Can you please open a new issue please @Stormhand - this is quite bit different issue with similar stacktrace (cc: @alexott - maybe you can take a look and implement a fix for that one.

Thank you. Opened a new one #31753

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants