Skip to content

Commit

Permalink
Add "watch" option to re-create file sink if needed (#471)
Browse files Browse the repository at this point in the history
  • Loading branch information
Delgan committed May 2, 2022
1 parent 9c9d137 commit bd59ca4
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
`Unreleased`_
=============

- Add a new ``watch`` optional argument to file sinks in order to automatically re-create possibly deleted or changed file (`#471 <https://github.com/Delgan/loguru/issues/471>`_).
- Make ``patch()`` calls cumulative instead of overriding the possibly existing patching function (`#462 <https://github.com/Delgan/loguru/issues/462>`_).


Expand Down
86 changes: 66 additions & 20 deletions loguru/_file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import shutil
import string
from functools import partial
from stat import ST_DEV, ST_INO

from . import _string_parsers as string_parsers
from ._ctime_functions import get_ctime, set_ctime
Expand Down Expand Up @@ -144,6 +145,7 @@ def __init__(
retention=None,
compression=None,
delay=False,
watch=False,
mode="a",
buffering=1,
encoding="utf8",
Expand All @@ -162,40 +164,92 @@ def __init__(
self._file = None
self._file_path = None

self._watch = watch
self._file_dev = -1
self._file_ino = -1

if not delay:
self._initialize_file()
path = self._create_path()
self._create_dirs(path)
self._create_file(path)

def write(self, message):
if self._file is None:
self._initialize_file()
path = self._create_path()
self._create_dirs(path)
self._create_file(path)

if self._watch:
self._reopen_if_needed()

if self._rotation_function is not None and self._rotation_function(message, self._file):
self._terminate_file(is_rotating=True)

self._file.write(message)

def _prepare_new_path(self):
def stop(self):
if self._watch:
self._reopen_if_needed()

self._terminate_file(is_rotating=False)

async def complete(self):
pass

def _create_path(self):
path = self._path.format_map({"time": FileDateFormatter()})
path = os.path.abspath(path)
return os.path.abspath(path)

def _create_dirs(self, path):
dirname = os.path.dirname(path)
os.makedirs(dirname, exist_ok=True)
return path

def _initialize_file(self):
path = self._prepare_new_path()
def _create_file(self, path):
self._file = open(path, **self._kwargs)
self._file_path = path

if self._watch:
fileno = self._file.fileno()
result = os.fstat(fileno)
self._file_dev = result[ST_DEV]
self._file_ino = result[ST_INO]

def _close_file(self):
self._file.flush()
self._file.close()

self._file = None
self._file_path = None
self._file_dev = -1
self._file_ino = -1

def _reopen_if_needed(self):
# Implemented based on standard library:
# https://github.com/python/cpython/blob/cb589d1b/Lib/logging/handlers.py#L486
if not self._file:
return

filepath = self._file_path

try:
result = os.stat(filepath)
except FileNotFoundError:
result = None

if not result or result[ST_DEV] != self._file_dev or result[ST_INO] != self._file_ino:
self._close_file()
self._create_dirs(filepath)
self._create_file(filepath)

def _terminate_file(self, *, is_rotating=False):
old_path = self._file_path

if self._file is not None:
self._file.close()
self._file = None
self._file_path = None
self._close_file()

if is_rotating:
new_path = self._prepare_new_path()
new_path = self._create_path()
self._create_dirs(new_path)

if new_path == old_path:
creation_time = get_ctime(old_path)
Expand All @@ -218,16 +272,8 @@ def _terminate_file(self, *, is_rotating=False):
self._retention_function(list(logs))

if is_rotating:
file = open(new_path, **self._kwargs)
self._create_file(new_path)
set_ctime(new_path, datetime.now().timestamp())
self._file_path = new_path
self._file = file

def stop(self):
self._terminate_file(is_rotating=False)

async def complete(self):
pass

@staticmethod
def _make_glob_patterns(path):
Expand Down
3 changes: 3 additions & 0 deletions loguru/_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ def add(
delay : |bool|, optional
Whether the file should be created as soon as the sink is configured, or delayed until
first logged message. It defaults to ``False``.
watch : |bool|, optional
Whether or not the file should be watched and re-opened when deleted or changed (based
on its device and inode properties) by an external program. It defaults to ``False``.
mode : |str|, optional
The opening mode as for built-in |open| function. It defaults to ``"a"`` (open the
file in appending mode).
Expand Down
145 changes: 145 additions & 0 deletions tests/test_filesink_watch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import os

import pytest
from loguru import logger


@pytest.mark.skipif(os.name == "nt", reason="Windows can't delete file in use")
def test_file_deleted_before_write_without_delay(tmpdir):
file = tmpdir.join("test.log")
logger.add(str(file), format="{message}", watch=True, delay=False)
os.remove(str(file))
logger.info("Test")
assert file.read() == "Test\n"


@pytest.mark.skipif(os.name == "nt", reason="Windows can't delete file in use")
def test_file_deleted_before_write_with_delay(tmpdir):
file = tmpdir.join("test.log")
logger.add(str(file), format="{message}", watch=True, delay=True)
logger.info("Test 1")
os.remove(str(file))
logger.info("Test 2")
assert file.read() == "Test 2\n"


@pytest.mark.skipif(os.name == "nt", reason="Windows can't delete file in use")
def test_file_path_containing_placeholder(tmpdir):
logger.add(str(tmpdir.join("test_{time}.log")), format="{message}", watch=True)
assert len(tmpdir.listdir()) == 1
filepath = tmpdir.listdir()[0]
os.remove(str(filepath))
logger.info("Test")
assert len(tmpdir.listdir()) == 1
assert filepath.read() == "Test\n"


@pytest.mark.skipif(os.name == "nt", reason="Windows can't delete file in use")
def test_file_reopened_with_arguments(tmpdir):
file = tmpdir.join("test.log")
logger.add(str(file), format="{message}", watch=True, encoding="ascii", errors="replace")
os.remove(str(file))
logger.info("é")
assert file.read() == "?\n"


@pytest.mark.skipif(os.name == "nt", reason="Windows can't delete file in use")
def test_file_manually_changed(tmpdir):
file = tmpdir.join("test.log")
logger.add(str(file), format="{message}", watch=True, mode="w")
os.remove(str(file))
file.write("Placeholder")
logger.info("Test")
assert file.read() == "Test\n"


@pytest.mark.skipif(os.name == "nt", reason="Windows can't delete file in use")
def test_file_folder_deleted(tmpdir):
file = tmpdir.join("foo/bar/test.log")
logger.add(str(file), format="{message}", watch=True)
os.remove(str(file))
os.rmdir(str(tmpdir.join("foo/bar")))
logger.info("Test")
assert file.read() == "Test\n"


@pytest.mark.skipif(os.name == "nt", reason="Windows can't delete file in use")
def test_file_deleted_before_rotation(tmpdir):
exists = None
file = tmpdir.join("test.log")

def rotate(_, __):
nonlocal exists
exists = file.exists()
return False

logger.add(str(file), format="{message}", watch=True, rotation=rotate)
os.remove(str(file))
logger.info("Test")
assert exists is True


@pytest.mark.skipif(os.name == "nt", reason="Windows can't delete file in use")
def test_file_deleted_before_compression(tmpdir):
exists = None
file = tmpdir.join("test.log")

def compress(_):
nonlocal exists
exists = file.exists()
return False

logger.add(str(file), format="{message}", watch=True, compression=compress)
os.remove(str(file))
logger.remove()
assert exists is True


@pytest.mark.skipif(os.name == "nt", reason="Windows can't delete file in use")
def test_file_deleted_before_retention(tmpdir):
exists = None
file = tmpdir.join("test.log")

def retain(_):
nonlocal exists
exists = file.exists()
return False

logger.add(str(file), format="{message}", watch=True, retention=retain)
os.remove(str(file))
logger.remove()
assert exists is True


def test_file_correctly_reused_after_rotation(tmpdir):
rotate = iter((False, True, False))
filepath = tmpdir.join("test.log")
logger.add(
str(filepath),
format="{message}",
mode="w",
watch=True,
rotation=lambda _, __: next(rotate),
)
logger.info("Test 1")
logger.info("Test 2")
logger.info("Test 3")
assert len(tmpdir.listdir()) == 2
rotated = next(f for f in tmpdir.listdir() if f != filepath)
assert rotated.read() == "Test 1\n"
assert filepath.read() == "Test 2\nTest 3\n"


@pytest.mark.parametrize("delay", [True, False])
@pytest.mark.parametrize("compression", [None, lambda _: None])
def test_file_closed_without_being_logged(tmpdir, delay, compression):
filepath = tmpdir.join("test.log")
logger.add(
str(filepath),
format="{message}",
watch=True,
delay=delay,
compression=compression,
)
logger.remove()
assert filepath.exists() is (False if delay else True)

0 comments on commit bd59ca4

Please sign in to comment.