Skip to content

Commit

Permalink
Update test case to fit remote log handers
Browse files Browse the repository at this point in the history
  • Loading branch information
wangding01 committed Nov 25, 2021
1 parent 58a7bd3 commit c6be2a5
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class TestS3TaskHandler(unittest.TestCase):
def setUp(self):
super().setUp()
self.remote_log_base = 's3://bucket/remote/log/location'
self.remote_log_location = 's3://bucket/remote/log/location/1.log'
self.remote_log_key = 'remote/log/location/1.log'
self.remote_log_location = 's3://bucket/remote/log/location/1_1.log'
self.remote_log_key = 'remote/log/location/1_1.log'
self.local_log_location = 'local/log/location'
self.filename_template = '{try_number}.log'
self.s3_task_handler = S3TaskHandler(
Expand All @@ -60,6 +60,7 @@ def setUp(self):
task = DummyOperator(task_id='task_for_testing_file_log_handler', dag=self.dag)
self.ti = TaskInstance(task=task, execution_date=date)
self.ti.try_number = 1
self.ti.seq_num = 1
self.ti.state = State.RUNNING
self.addCleanup(self.dag.clear)

Expand Down Expand Up @@ -126,15 +127,16 @@ def test_set_context_not_raw(self):
self.s3_task_handler.set_context(self.ti)

self.assertTrue(self.s3_task_handler.upload_on_close)
mock_open.assert_called_once_with(os.path.abspath('local/log/location/1.log'), 'w')
mock_open.assert_called_once_with(os.path.abspath('local/log/location/1_1.log'), 'w')
mock_open().write.assert_not_called()

def test_read(self):
self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'Log line\n')
log, metadata = self.s3_task_handler.read(self.ti)
number = f'{self.ti.seq_num}_{self.ti.try_number}'
log, metadata = self.s3_task_handler.read(self.ti, number)
self.assertEqual(
log[0][0][-1],
'*** Reading remote log from s3://bucket/remote/log/location/1.log.\nLog line\n\n',
'*** Reading remote log from s3://bucket/remote/log/location/1_1.log.\nLog line\n\n',
)
self.assertEqual(metadata, [{'end_of_log': True}])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ def setUp(self) -> None:
task = DummyOperator(task_id="task_for_testing_gcs_task_handler")
self.ti = TaskInstance(task=task, execution_date=date)
self.ti.try_number = 1
self.ti.seq_num = 1
self.ti.state = State.RUNNING
self.remote_log_base = "gs://bucket/remote/log/location"
self.remote_log_location = "gs://my-bucket/path/to/1.log"
self.remote_log_location = "gs://my-bucket/path/to/1_1.log"
self.local_log_location = tempfile.mkdtemp()
self.filename_template = "{try_number}.log"
self.addCleanup(self.dag.clear)
Expand Down Expand Up @@ -76,14 +77,14 @@ def test_hook(self, mock_client, mock_creds):
@mock.patch("google.cloud.storage.Blob")
def test_should_read_logs_from_remote(self, mock_blob, mock_client, mock_creds):
mock_blob.from_string.return_value.download_as_string.return_value = "CONTENT"

logs, metadata = self.gcs_task_handler._read(self.ti, self.ti.try_number)
number = f'{self.ti.seq_num}_{self.ti.try_number}'
logs, metadata = self.gcs_task_handler._read(self.ti, number)
mock_blob.from_string.assert_called_once_with(
"gs://bucket/remote/log/location/1.log", mock_client.return_value
"gs://bucket/remote/log/location/1_1.log", mock_client.return_value
)

self.assertEqual(
"*** Reading remote log from gs://bucket/remote/log/location/1.log.\nCONTENT\n", logs
"*** Reading remote log from gs://bucket/remote/log/location/1_1.log.\nCONTENT\n", logs
)
self.assertEqual({"end_of_log": True}, metadata)

Expand All @@ -97,16 +98,17 @@ def test_should_read_from_local(self, mock_blob, mock_client, mock_creds):
mock_blob.from_string.return_value.download_as_string.side_effect = Exception("Failed to connect")

self.gcs_task_handler.set_context(self.ti)
log, metadata = self.gcs_task_handler._read(self.ti, self.ti.try_number)
number = f'{self.ti.seq_num}_{self.ti.try_number}'
log, metadata = self.gcs_task_handler._read(self.ti, number)

self.assertEqual(
log,
"*** Unable to read remote log from gs://bucket/remote/log/location/1.log\n*** "
f"Failed to connect\n\n*** Reading local file: {self.local_log_location}/1.log\n",
"*** Unable to read remote log from gs://bucket/remote/log/location/1_1.log\n*** "
f"Failed to connect\n\n*** Reading local file: {self.local_log_location}/1_1.log\n",
)
self.assertDictEqual(metadata, {"end_of_log": True})
mock_blob.from_string.assert_called_once_with(
"gs://bucket/remote/log/location/1.log", mock_client.return_value
"gs://bucket/remote/log/location/1_1.log", mock_client.return_value
)

@mock.patch(
Expand Down Expand Up @@ -134,9 +136,9 @@ def test_write_to_remote_on_close(self, mock_blob, mock_client, mock_creds):

mock_blob.assert_has_calls(
[
mock.call.from_string("gs://bucket/remote/log/location/1.log", mock_client.return_value),
mock.call.from_string("gs://bucket/remote/log/location/1_1.log", mock_client.return_value),
mock.call.from_string().download_as_string(),
mock.call.from_string("gs://bucket/remote/log/location/1.log", mock_client.return_value),
mock.call.from_string("gs://bucket/remote/log/location/1_1.log", mock_client.return_value),
mock.call.from_string().upload_from_string("CONTENT\nMESSAGE\n", content_type="text/plain"),
],
any_order=False,
Expand Down Expand Up @@ -164,14 +166,14 @@ def test_failed_write_to_remote_on_close(self, mock_blob, mock_client, mock_cred
'INFO:airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler:Previous '
'log discarded: sequence item 0: expected str instance, bytes found',
'ERROR:airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler:Could '
'not write logs to gs://bucket/remote/log/location/1.log: Failed to connect',
'not write logs to gs://bucket/remote/log/location/1_1.log: Failed to connect',
],
)
mock_blob.assert_has_calls(
[
mock.call.from_string("gs://bucket/remote/log/location/1.log", mock_client.return_value),
mock.call.from_string("gs://bucket/remote/log/location/1_1.log", mock_client.return_value),
mock.call.from_string().download_as_string(),
mock.call.from_string("gs://bucket/remote/log/location/1.log", mock_client.return_value),
mock.call.from_string("gs://bucket/remote/log/location/1_1.log", mock_client.return_value),
mock.call.from_string().upload_from_string(
"*** Previous log discarded: sequence item 0: expected str instance, bytes found\n\n",
content_type="text/plain",
Expand Down Expand Up @@ -205,9 +207,9 @@ def test_write_to_remote_on_close_failed_read_old_logs(self, mock_blob, mock_cli

mock_blob.assert_has_calls(
[
mock.call.from_string("gs://bucket/remote/log/location/1.log", mock_client.return_value),
mock.call.from_string("gs://bucket/remote/log/location/1_1.log", mock_client.return_value),
mock.call.from_string().download_as_string(),
mock.call.from_string("gs://bucket/remote/log/location/1.log", mock_client.return_value),
mock.call.from_string("gs://bucket/remote/log/location/1_1.log", mock_client.return_value),
mock.call.from_string().upload_from_string(
"*** Previous log discarded: Fail to download\n\nMESSAGE\n", content_type="text/plain"
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TestWasbTaskHandler(unittest.TestCase):
def setUp(self):
super().setUp()
self.wasb_log_folder = 'wasb://container/remote/log/location'
self.remote_log_location = 'remote/log/location/1.log'
self.remote_log_location = 'remote/log/location/1_1.log'
self.local_log_location = 'local/log/location'
self.container_name = "wasb-container"
self.filename_template = '{try_number}.log'
Expand All @@ -51,6 +51,7 @@ def setUp(self):
task = DummyOperator(task_id='task_for_testing_file_log_handler', dag=self.dag)
self.ti = TaskInstance(task=task, execution_date=date)
self.ti.try_number = 1
self.ti.seq_num = 1
self.ti.state = State.RUNNING
self.addCleanup(self.dag.clear)

Expand Down Expand Up @@ -122,7 +123,7 @@ def test_wasb_read(self, mock_hook):
[
(
'',
'*** Reading remote log from wasb://container/remote/log/location/1.log.\n'
'*** Reading remote log from wasb://container/remote/log/location/1_1.log.\n'
'Log line\n',
)
]
Expand All @@ -142,7 +143,7 @@ def test_wasb_read_raises(self):
handler.wasb_read(self.remote_log_location, return_error=True)

mock_error.assert_called_once_with(
'Could not read logs from remote/log/location/1.log', exc_info=True
'Could not read logs from remote/log/location/1_1.log', exc_info=True
)

@mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook")
Expand Down Expand Up @@ -185,5 +186,5 @@ def test_write_raises(self):
handler.wasb_write('text', self.remote_log_location, append=False)

mock_error.assert_called_once_with(
'Could not write logs to %s', 'remote/log/location/1.log', exc_info=True
'Could not write logs to %s', 'remote/log/location/1_1.log', exc_info=True
)

0 comments on commit c6be2a5

Please sign in to comment.