Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made get_conn in JdbcHook threadsafe to avoid OSError: JVM is already started #44718

Merged
merged 20 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6751daa
refactor: Made get_conn in JdbcHook threadsafe to avoid OSError: JVM …
davidblain-infrabel Dec 6, 2024
d18abf9
Refactor: removed commented code
dabla Dec 6, 2024
bc5c960
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 6, 2024
9f99dd0
refactor: Reorganized imports
davidblain-infrabel Dec 9, 2024
33e40ca
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 9, 2024
04312af
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 9, 2024
e81d841
refactor: Added white line
davidblain-infrabel Dec 9, 2024
ca2acfd
refactor: Fixed static checks test JdbcHook
davidblain-infrabel Dec 9, 2024
a7b988a
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 9, 2024
02d8cf6
refactor: Added white line
davidblain-infrabel Dec 9, 2024
96d35b5
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 9, 2024
1d9d7bd
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 10, 2024
38b0093
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 10, 2024
4134bf0
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 10, 2024
1ae9d56
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 10, 2024
5afceb8
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 11, 2024
cc02de4
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 12, 2024
553155f
refactor: Refactored JdbcHook get_conn method using RLock as suggeste…
davidblain-infrabel Dec 13, 2024
100e39e
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 13, 2024
3fd0bda
Merge branch 'main' into feature/threadsafe-jdbc-hook-get-conn
dabla Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions providers/src/airflow/providers/jdbc/hooks/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from airflow.exceptions import AirflowException
from airflow.providers.common.sql.hooks.sql import DbApiHook
from wrapt import synchronized

if TYPE_CHECKING:
from airflow.models.connection import Connection
Expand Down Expand Up @@ -177,6 +178,7 @@ def get_sqlalchemy_engine(self, engine_kwargs=None):

return super().get_sqlalchemy_engine(engine_kwargs)

@synchronized
def get_conn(self) -> jaydebeapi.Connection:
conn: Connection = self.connection
host: str = conn.host
Expand Down
49 changes: 47 additions & 2 deletions providers/tests/jdbc/hooks/test_jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import json
import logging
import sqlite3
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import current_thread
from time import sleep
from unittest import mock
from unittest.mock import Mock, patch
from unittest.mock import Mock, patch, MagicMock

import jaydebeapi
import pytest

from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.providers.jdbc.hooks.jdbc import JdbcHook, suppress_and_warn
Expand Down Expand Up @@ -54,10 +56,17 @@ def get_hook(
**conn_params,
}
)
jvm_started = False

class MockedJdbcHook(JdbcHook):
@classmethod
def get_connection(cls, conn_id: str) -> Connection:
# nonlocal jvm_started
#
# if jvm_started:
# raise OSError("JVM already started")
#
# jvm_started = True
return connection

hook = MockedJdbcHook(**hook_params)
Expand Down Expand Up @@ -229,3 +238,39 @@ def test_get_sqlalchemy_engine_verify_creator_is_being_used(self):
jdbc_hook.get_conn = lambda: connection
engine = jdbc_hook.get_sqlalchemy_engine()
assert engine.connect().connection.connection == connection

def test_get_conn_thread_safety(self):
mock_conn = MagicMock()
open_connections = 0

def connect_side_effect(*args, **kwargs):
nonlocal open_connections
open_connections += 1
logging.debug("Thread %s has %s open connections", current_thread().name, open_connections)

try:
if open_connections > 1:
raise OSError("JVM is already started")
finally:
sleep(0.1) # wait a bit before releasing the connection again
open_connections -= 1

return mock_conn

with patch.object(jaydebeapi, "connect", side_effect=connect_side_effect) as mock_connect:
jdbc_hook = get_hook()

def call_get_conn():
conn = jdbc_hook.get_conn()
assert conn is mock_conn

with ThreadPoolExecutor(max_workers=10) as executor:
futures = []

for _ in range(0, 10):
futures.append(executor.submit(call_get_conn))

for future in as_completed(futures):
future.result() # This will raise OSError if get_conn isn't threadsafe

assert mock_connect.call_count == 10
Loading