From 4a651ee9375a4ce12d5d3dd84f6f425adf9a8b8c Mon Sep 17 00:00:00 2001 From: Ashish Patel Date: Fri, 7 Jun 2024 18:24:32 +0530 Subject: [PATCH] sftp_sensor: fixing resource management with sensor (#40022) closes: #39922 Summary When a user tries to use the SFTPSensor operator with deferrable=True, using path/newer_than, it will open a connection and remain open, the reason is because of method get_mod_time in opening a sftp connection but not closing it afterward. As part of this change, we are closing the connection. --- airflow/providers/sftp/hooks/sftp.py | 10 +++++++--- airflow/providers/sftp/sensors/sftp.py | 13 +++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/airflow/providers/sftp/hooks/sftp.py b/airflow/providers/sftp/hooks/sftp.py index 0907fba2eb49b..7a2e34a21561b 100644 --- a/airflow/providers/sftp/hooks/sftp.py +++ b/airflow/providers/sftp/hooks/sftp.py @@ -549,7 +549,7 @@ async def get_files_and_attrs_by_pattern( matched_files = [file for file in files_list if fnmatch(str(file.filename), fnmatch_pattern)] return matched_files - async def get_mod_time(self, path: str) -> str: + async def get_mod_time(self, path: str) -> str: # type: ignore[return] """ Make SFTP async connection. @@ -558,9 +558,10 @@ async def get_mod_time(self, path: str) -> str: :param path: full path to the remote file """ - ssh_conn = await self._get_conn() - sftp_client = await ssh_conn.start_sftp_client() + ssh_conn = None try: + ssh_conn = await self._get_conn() + sftp_client = await ssh_conn.start_sftp_client() ftp_mdtm = await sftp_client.stat(path) modified_time = ftp_mdtm.mtime mod_time = datetime.datetime.fromtimestamp(modified_time).strftime("%Y%m%d%H%M%S") # type: ignore[arg-type] @@ -568,3 +569,6 @@ async def get_mod_time(self, path: str) -> str: return mod_time except asyncssh.SFTPNoSuchFile: raise AirflowException("No files matching") + finally: + if ssh_conn: + ssh_conn.close() diff --git a/airflow/providers/sftp/sensors/sftp.py b/airflow/providers/sftp/sensors/sftp.py index de3870937d43b..f56ad9341001d 100644 --- a/airflow/providers/sftp/sensors/sftp.py +++ b/airflow/providers/sftp/sensors/sftp.py @@ -111,6 +111,19 @@ def poke(self, context: Context) -> PokeReturnValue | bool: _newer_than = convert_to_utc(self.newer_than) if _newer_than <= _mod_time: files_found.append(actual_file_to_check) + self.log.info( + "File %s has modification time: '%s', which is newer than: '%s'", + actual_file_to_check, + str(_mod_time), + str(_newer_than), + ) + else: + self.log.info( + "File %s has modification time: '%s', which is older than: '%s'", + actual_file_to_check, + str(_mod_time), + str(_newer_than), + ) else: files_found.append(actual_file_to_check)