Skip to content

Commit

Permalink
Ensure Tableau connection is active to access wait_for_state (#20433)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericpp authored Dec 22, 2021
1 parent 20d8630 commit 636ae0a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
20 changes: 10 additions & 10 deletions airflow/providers/tableau/operators/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,16 @@ def execute(self, context: dict) -> str:

response = method(resource_id)

job_id = response.id

if self.method == 'refresh':
if self.blocking_refresh:
if not tableau_hook.wait_for_state(
job_id=job_id,
check_interval=self.check_interval,
target_state=TableauJobFinishCode.SUCCESS,
):
raise TableauJobFailedException(f'The Tableau Refresh {self.resource} Job failed!')
job_id = response.id

if self.method == 'refresh':
if self.blocking_refresh:
if not tableau_hook.wait_for_state(
job_id=job_id,
check_interval=self.check_interval,
target_state=TableauJobFinishCode.SUCCESS,
):
raise TableauJobFailedException(f'The Tableau Refresh {self.resource} Job failed!')

return job_id

Expand Down
40 changes: 38 additions & 2 deletions tests/providers/tableau/operators/test_tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,26 @@ def test_execute_workbooks_blocking(self, mock_tableau_hook):
"""
Test execute workbooks blocking
"""
mock_signed_in = [False]

def mock_hook_enter():
mock_signed_in[0] = True
return mock_tableau_hook

def mock_hook_exit(exc_type, exc_val, exc_tb):
mock_signed_in[0] = False

def mock_wait_for_state(job_id, target_state, check_interval):
if not mock_signed_in[0]:
raise Exception('Not signed in')

return True

mock_tableau_hook.return_value.__enter__ = Mock(side_effect=mock_hook_enter)
mock_tableau_hook.return_value.__exit__ = Mock(side_effect=mock_hook_exit)
mock_tableau_hook.wait_for_state = Mock(side_effect=mock_wait_for_state)

mock_tableau_hook.get_all = Mock(return_value=self.mocked_workbooks)
mock_tableau_hook.return_value.__enter__ = Mock(return_value=mock_tableau_hook)
mock_tableau_hook.server.jobs.get_by_id = Mock(
return_value=Mock(finish_code=TableauJobFinishCode.SUCCESS.value)
)
Expand Down Expand Up @@ -123,8 +141,26 @@ def test_execute_datasources_blocking(self, mock_tableau_hook):
"""
Test execute datasources blocking
"""
mock_signed_in = [False]

def mock_hook_enter():
mock_signed_in[0] = True
return mock_tableau_hook

def mock_hook_exit(exc_type, exc_val, exc_tb):
mock_signed_in[0] = False

def mock_wait_for_state(job_id, target_state, check_interval):
if not mock_signed_in[0]:
raise Exception('Not signed in')

return True

mock_tableau_hook.return_value.__enter__ = Mock(side_effect=mock_hook_enter)
mock_tableau_hook.return_value.__exit__ = Mock(side_effect=mock_hook_exit)
mock_tableau_hook.wait_for_state = Mock(side_effect=mock_wait_for_state)

mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
mock_tableau_hook.return_value.__enter__ = Mock(return_value=mock_tableau_hook)
operator = TableauOperator(find='ds_2', resource='datasources', **self.kwargs)

job_id = operator.execute(context={})
Expand Down

0 comments on commit 636ae0a

Please sign in to comment.