Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cylc clean improvements #4440

Merged
merged 11 commits into from
Oct 6, 2021
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ Fix ``cylc stop --kill`` which was not actually killing task jobs.
[#4338](https://github.com/cylc/cylc-flow/pull/4338) - Cylc install -C option
now works with relative paths.

[#4440](https://github.com/cylc/cylc-flow/pull/4440) -
Fix an error that could occur during remote clean and other `cylc clean`
improvements.

[#4445](https://github.com/cylc/cylc-flow/pull/4445) - Cylc will prevent you
using the same name for a platform and a platform group. Which one it should
pick is ambiguous, and is a setup error.
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
FILE_INSTALL_LOG = 'cylc-rsync'

LOG = logging.getLogger(CYLC_LOG)
LOG.addHandler(logging.NullHandler()) # Start with a null handler
RSYNC_LOG = logging.getLogger(FILE_INSTALL_LOG)
RSYNC_LOG.addHandler(logging.NullHandler())
# Start with a null handler
for log in (LOG, RSYNC_LOG):
log.addHandler(logging.NullHandler())

LOG_LEVELS = {
"INFO": logging.INFO,
Expand Down
59 changes: 50 additions & 9 deletions cylc/flow/loggingutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@
because "time.strftime" will handle time zone from "localtime" properly.
"""
from contextlib import suppress
from functools import partial
from glob import glob
import logging
import os
import re
import sys
import logging
import textwrap

from glob import glob
from functools import partial
from typing import Optional

from ansimarkup import parse as cparse

from cylc.flow.wallclock import (get_current_time_string,
get_time_string_from_unix_time)
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.wallclock import (
get_current_time_string, get_time_string_from_unix_time
)


class CylcLogFormatter(logging.Formatter):
Expand All @@ -55,12 +56,16 @@ class CylcLogFormatter(logging.Formatter):

# default hard-coded max width for log entries
# NOTE: this should be sufficiently long that log entries read by the
# deamonise script (url, pid) are not wrapped
# daemonise script (url, pid) are not wrapped
MAX_WIDTH = 999

def __init__(
self, timestamp=True, color=False, max_width=None, dev_info=False
):
self,
timestamp: bool = True,
color: bool = False,
max_width: Optional[int] = None,
dev_info: bool = False
) -> None:
self.timestamp = None
self.color = None
self.max_width = self.MAX_WIDTH
Expand Down Expand Up @@ -249,3 +254,39 @@ def re_formatter(log_string):
for sub, repl in LOG_LEVEL_REGEXES:
log_string = sub.sub(repl, log_string)
return log_string


def disable_timestamps(logger: logging.Logger) -> None:
"""For readability omit timestamps from logging."""
for handler in logger.handlers:
if isinstance(handler.formatter, CylcLogFormatter):
handler.formatter.configure(timestamp=False)


def setup_segregated_log_streams(
logger: logging.Logger, stderr_handler: logging.StreamHandler
) -> None:
"""Set up a logger so that info and debug messages get printed to stdout,
while warnings and above get printed to stderr.

Args:
logger: The logger to modify.
stderr_handler: The existing stderr stream handler.
"""
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.DEBUG)
# Filter out >= warnings from stdout
stdout_handler.addFilter(lambda rec: int(rec.levelno < logging.WARNING))
stdout_handler.setFormatter(stderr_handler.formatter)
logger.addHandler(stdout_handler)

stderr_handler.setLevel(logging.WARNING)


def close_log(logger: logging.Logger) -> None:
"""Close log handlers for the specified logger."""
for handler in logger.handlers:
with suppress(IOError):
# suppress traceback which `logging` might try to write to the
# log we are trying to close
handler.close()
50 changes: 36 additions & 14 deletions cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import re
from ansimarkup import parse as cparse
import sys
from typing import Any, Dict, Optional, List
from typing import Any, Dict, Optional, List, Tuple

from cylc.flow import LOG, RSYNC_LOG
import cylc.flow.flags
from cylc.flow.loggingutil import CylcLogFormatter
from cylc.flow.loggingutil import (
CylcLogFormatter,
setup_segregated_log_streams,
)


def format_shell_examples(string):
Expand Down Expand Up @@ -156,9 +159,20 @@ class CylcOptionParser(OptionParser):
MULTITASK_USAGE = MULTI_USAGE_TEMPLATE.format(
WITHOUT_CYCLE_GLOBS, WITHOUT_CYCLE_EXAMPLES)

def __init__(self, usage, argdoc=None, comms=False,
jset=False, multitask=False, multitask_nocycles=False,
prep=False, auto_add=True, icp=False, color=True):
def __init__(
self,
usage: str,
argdoc: Optional[List[Tuple[str, str]]] = None,
comms: bool = False,
jset: bool = False,
multitask: bool = False,
multitask_nocycles: bool = False,
prep: bool = False,
auto_add: bool = True,
icp: bool = False,
color: bool = True,
segregated_log: bool = False
) -> None:

self.auto_add = auto_add
if argdoc is None:
Expand All @@ -184,8 +198,10 @@ def __init__(self, usage, argdoc=None, comms=False,
self.jset = jset
self.prep = prep
self.icp = icp
self.workflow_info = []
self.color = color
# Whether to log messages that are below warning level to stdout
# instead of stderr:
self.segregated_log = segregated_log

maxlen = 0
for arg in argdoc:
Expand Down Expand Up @@ -388,21 +404,27 @@ def parse_args(self, api_args, remove_opts=None):
# better choice for the logging stream. This allows us to use STDOUT
# for verbosity agnostic outputs.
# 2. Scheduler will remove this handler when it becomes a daemon.
if options.verbosity > 1:
if options.verbosity < 0:
LOG.setLevel(logging.WARNING)
elif options.verbosity > 0:
LOG.setLevel(logging.DEBUG)
else:
LOG.setLevel(logging.INFO)
# Remove NullHandler before add the StreamHandler
RSYNC_LOG.setLevel(logging.INFO)
while LOG.handlers:
LOG.handlers[0].close()
LOG.removeHandler(LOG.handlers[0])
errhandler = logging.StreamHandler(sys.stderr)
errhandler.setFormatter(CylcLogFormatter(
# Remove NullHandler before add the StreamHandler
for log in (LOG, RSYNC_LOG):
while log.handlers:
log.handlers[0].close()
log.removeHandler(log.handlers[0])
log_handler = logging.StreamHandler(sys.stderr)
log_handler.setFormatter(CylcLogFormatter(
timestamp=options.log_timestamp,
dev_info=bool(options.verbosity > 2)
))
LOG.addHandler(errhandler)
LOG.addHandler(log_handler)

if self.segregated_log:
setup_segregated_log_streams(LOG, log_handler)

return (options, args)

Expand Down
24 changes: 14 additions & 10 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,18 @@ def remove_dir_and_target(path: Union[Path, str]) -> None:
if os.path.islink(path):
if os.path.exists(path):
target = os.path.realpath(path)
LOG.debug(
f'Removing symlink target directory: ({path} ->) {target}')
LOG.info(
f'Removing symlink target directory: ({path} ->) {target}'
)
rmtree(target, onerror=handle_rmtree_err)
LOG.debug(f'Removing symlink: {path}')
LOG.info(f'Removing symlink: {path}')
else:
LOG.debug(f'Removing broken symlink: {path}')
LOG.info(f'Removing broken symlink: {path}')
os.remove(path)
elif not os.path.exists(path):
raise FileNotFoundError(path)
else:
LOG.debug(f'Removing directory: {path}')
LOG.info(f'Removing directory: {path}')
rmtree(path, onerror=handle_rmtree_err)


Expand All @@ -291,13 +292,13 @@ def remove_dir_or_file(path: Union[Path, str]) -> None:
if not os.path.isabs(path):
raise ValueError("Path must be absolute")
if os.path.islink(path):
LOG.debug(f"Removing symlink: {path}")
LOG.info(f"Removing symlink: {path}")
os.remove(path)
elif os.path.isfile(path):
LOG.debug(f"Removing file: {path}")
LOG.info(f"Removing file: {path}")
os.remove(path)
else:
LOG.debug(f"Removing directory: {path}")
LOG.info(f"Removing directory: {path}")
rmtree(path, onerror=handle_rmtree_err)


Expand Down Expand Up @@ -330,7 +331,7 @@ def remove_empty_parents(
continue
try:
parent.rmdir()
LOG.debug(f'Removing directory: {parent}')
LOG.info(f'Removing directory: {parent}')
except OSError:
break

Expand Down Expand Up @@ -383,7 +384,10 @@ def parse_rm_dirs(rm_dirs: Iterable[str]) -> Set[str]:
part = os.path.normpath(part)
if os.path.isabs(part):
raise UserInputError("--rm option cannot take absolute paths")
if part == '.' or part.startswith(f'..{os.sep}'):
if (
part in {os.curdir, os.pardir} or
part.startswith(f"{os.pardir}{os.sep}") # '../'
):
raise UserInputError(
"--rm option cannot take paths that point to the "
"run directory or above"
Expand Down
13 changes: 13 additions & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ async def configure(self):
"""
self.profiler.log_memory("scheduler.py: start configure")

# Print workflow name to disambiguate in case of inferred run number
LOG.info(f"Workflow: {self.workflow}")

self.is_restart = self.workflow_db_mgr.restart_check()
# Note: since cylc play replaced cylc run/restart, we wait until this
# point before setting self.is_restart as we couldn't tell if
Expand Down Expand Up @@ -537,6 +540,12 @@ async def log_start(self):
else:
n_restart = 0

is_quiet = (cylc.flow.flags.verbosity < 0)
log_level = LOG.getEffectiveLevel()
if is_quiet:
# Temporarily change logging level to log important info
LOG.setLevel(logging.INFO)

log_extra = {TimestampRotatingFileHandler.FILE_HEADER_FLAG: True}
log_extra_num = {
TimestampRotatingFileHandler.FILE_HEADER_FLAG: True,
Expand Down Expand Up @@ -569,6 +578,10 @@ async def log_start(self):
'Start point: %s', self.config.start_point, extra=log_extra)
LOG.info('Final point: %s', self.config.final_point, extra=log_extra)

if is_quiet:
LOG.info("Quiet mode on")
LOG.setLevel(log_level)

async def start_scheduler(self):
"""Start the scheduler main loop."""
try:
Expand Down
30 changes: 11 additions & 19 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from ansimarkup import parse as cparse
import asyncio
from contextlib import suppress
from functools import lru_cache
import sys
from typing import TYPE_CHECKING
Expand All @@ -27,7 +26,10 @@
import cylc.flow.flags
from cylc.flow.host_select import select_workflow_host
from cylc.flow.hostuserutil import is_remote_host
from cylc.flow.loggingutil import TimestampRotatingFileHandler
from cylc.flow.loggingutil import (
close_log,
TimestampRotatingFileHandler,
)
from cylc.flow.network.client import WorkflowRuntimeClient
from cylc.flow.option_parsers import (
CylcOptionParser as COP,
Expand Down Expand Up @@ -251,25 +253,15 @@ def _open_logs(reg, no_detach):
while LOG.handlers:
LOG.handlers[0].close()
LOG.removeHandler(LOG.handlers[0])
workflow_log_handler = get_workflow_run_log_name(reg)
log_path = get_workflow_run_log_name(reg)
LOG.addHandler(
TimestampRotatingFileHandler(
workflow_log_handler,
no_detach))

TimestampRotatingFileHandler(log_path, no_detach)
)
# Add file installation log
file_install_log_path = get_workflow_file_install_log_name(reg)
handler = TimestampRotatingFileHandler(file_install_log_path, no_detach)
RSYNC_LOG.addHandler(handler)


def _close_logs():
"""Close Cylc log handlers for a flow run."""
for handler in LOG.handlers:
with suppress(IOError):
# suppress traceback which `logging` might try to write to the
# log we are trying to close
handler.close()
RSYNC_LOG.addHandler(
TimestampRotatingFileHandler(file_install_log_path, no_detach)
)


def scheduler_cli(options: 'Values', reg: str) -> None:
Expand Down Expand Up @@ -345,7 +337,7 @@ def scheduler_cli(options: 'Values', reg: str) -> None:
# NOTE: any threads which include sleep statements could cause
# sys.exit to hang if not shutdown properly
LOG.info("DONE")
_close_logs()
close_log(LOG)
sys.exit(ret)


Expand Down
Loading