Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

main loop: fix log_main_loop and add log_db plugin #4674

Merged
merged 5 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions .github/workflows/test_conda-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
pull_request:
paths:
- 'conda-environment.yml'
- '.github/workflows/test_conda-build.yml'
schedule:
- cron: '17 22 * * 6'
workflow_dispatch:
Expand All @@ -18,19 +19,31 @@ jobs:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to use actions/setup-python?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think so, this action doesn't use Python?

- name: build conda env
run: |
# The appended lines make sure that we actually build with Cylc
# too:
echo " - pip" >> conda-evironment.yml
echo " - pip": >> conda-environment.yml
echo " - ." >> conda-environment.yml
cat conda-environment.yml
conda env create \
-f conda-environment.yml
. /usr/share/miniconda/etc/profile.d/conda.sh
# write environment file
env_file='conda-environment.yml'
echo " - pip" >> "$env_file" # list pip as a dependency
echo " - pip:" >> "$env_file" # add a pip section
echo " - ." >> "$env_file" # install cylc-flow (pip install .)
cat "$env_file"
# install environment
conda env create \
-f "$env_file" \
--prefix cylc-dev
. /usr/share/miniconda/etc/profile.d/conda.sh
# check cylc-flow was installed correctly
conda run --prefix cylc-dev cylc version --long

- name: check cylc-version
- name: check for activate scripts
run: |
find /usr/share/miniconda/envs/cylc-dev/. -name "activate.d" | tee > activates.txt
# https://github.com/cylc/cylc-flow/issues/3704#issuecomment-897442365
# locate all activate scripts
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
find ./cylc-dev/ -name "activate.d" | tee > activates.txt
# ignore the conda activate script itself
sed -i '/cylc-dev\/etc\/conda\/activate.d/d' activates.txt
# check to make sure no packages have contributed new activate scripts
# (we rely on having a conda activate-less environment)
if [[ $(cat activates.txt | wc -l) -ne 0 ]]; then
echo '::error::Found activate scripts in installation.'
cat activates.txt >&2
exit 1
fi
1 change: 1 addition & 0 deletions conda-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ dependencies:
#- pandas >=1.0,<2
#- pympler
#- matplotlib-base
#- sqlparse
6 changes: 3 additions & 3 deletions cylc/flow/main_loop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ def load(config, additional_plugins=None):
)[(plugin_name, coro_name)] = coro
plugins['timings'][(plugin_name, coro_name)] = deque(maxlen=1)
LOG.debug(
'Loaded main loop plugin "%s": %s',
plugin_name + '\n',
'\n'.join((f'* {x}' for x in log))
'Loaded main loop plugin "%s":\n%s',
plugin_name,
'\n'.join(f'* {x}' for x in log)
)
# set the initial state of the plugin
plugins['state'][plugin_name] = {}
Expand Down
94 changes: 94 additions & 0 deletions cylc/flow/main_loop/log_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Log all database transactions.

.. note::

This plugin is for Cylc developers debugging database issues.

Writes an SQL file into the workflow run directory on shutown.

"""

import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path

import sqlparse

from cylc.flow import CYLC_LOG
from cylc.flow.main_loop import (startup, shutdown)


DB_LOG = logging.getLogger(f'{CYLC_LOG}-db')


def _format(sql_string):
"""Pretty print an SQL statement."""
wxtim marked this conversation as resolved.
Show resolved Hide resolved
return '\n'.join(
sqlparse.format(
statement,
reindent_aligned=True,
use_space_around_operators=True,
strip_comments=True,
keyword_case='upper',
identifier_case='lower',
)
for statement in sqlparse.split(sql_string)
) + '\n'


def _log(sql_string):
"""Log a SQL string."""
DB_LOG.info(_format(sql_string))


def _patch_db_connect(db_connect_method):
"""Patch the workflow DAO to configure logging.

We patch the connect method so that any subsequent re-connections
are also patched.
"""
def _inner(*args, **kwargs):
nonlocal db_connect_method
conn = db_connect_method(*args, **kwargs)
conn.set_trace_callback(_log)
return conn
return _inner


@startup
async def init(scheduler, state):
# configure log handler
DB_LOG.setLevel(logging.INFO)
handler = RotatingFileHandler(
str(Path(scheduler.workflow_run_dir, f'{__name__}.sql')),
maxBytes=1000000,
)
state['log_handler'] = handler
DB_LOG.addHandler(handler)

# configure the DB manager to log all DB operations
scheduler.workflow_db_mgr.pri_dao.connect = _patch_db_connect(
scheduler.workflow_db_mgr.pri_dao.connect
)


@shutdown
async def stop(scheduler, state):
handler = state.get('log_handler')
if handler:
handler.close()
55 changes: 29 additions & 26 deletions cylc/flow/main_loop/log_main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import json
from pathlib import Path

from cylc.flow.main_loop import (startup, shutdown)
from cylc.flow.main_loop import startup, shutdown

try:
import matplotlib
Expand All @@ -40,47 +40,45 @@

@startup
async def init(scheduler, _):
"""Patch the timings for each plugin to use an unlimited deque."""
for state in scheduler.main_loop_plugins['state'].values():
state['timings'] = deque()
"""Override default queue length of 1.

This allows timings to accumulate, normally only the most recent is kept.
"""
plugins = scheduler.main_loop_plugins
for plugin in plugins['timings']:
plugins['timings'][plugin] = deque()


@shutdown
async def report(scheduler, _):
"""Extract plugin function timings."""
data = _transpose(scheduler.main_loop_plugins['state'])
data = _normalise(data)
_plot(data, scheduler.workflow_run_dir)


def _transpose(all_states):
return {
plugin: tuple(zip(*state['timings']))
for plugin, state
in all_states.items()
if state['timings']
}
data = scheduler.main_loop_plugins['timings']
if data:
data = _normalise(data)
_dump(data, scheduler.workflow_run_dir)
_plot(data, scheduler.workflow_run_dir)


def _normalise(data):
earliest_time = min((
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would I be unreasonable in asking for docstrings for these functions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • _normalise normalises the data.
  • _dump json.dumps the data.
  • _plot plots the data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the meaning of "normalise" supposed to be intuitively obvious, without reading the code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the unit test does have a docstring 😁

def test_normalise(test_data):
    """Ensure we correctly normalise the timings against the earliest time."""

time
for _, (times, _) in data.items()
for time in times
start_time
for _, timings in data.items()
for start_time, duration in timings
))
return {
plugin_name: (
tuple((time - earliest_time for time in times)),
y_data
)
for plugin_name, (times, y_data) in data.items()
plugin_name: [
(start_time - earliest_time, duration)
for start_time, duration in timings
]
for (plugin_name, _), timings in data.items()
}


def _dump(data, path):
json.dump(
data,
Path(path, f'{__name__}.json').open('w+')
Path(path, f'{__name__}.json').open('w+'),
indent=4
)
return True

Expand All @@ -93,7 +91,12 @@ def _plot(data, path):
ax1.set_xlabel('Workflow Run Time (s)')
ax1.set_ylabel('XTrigger Run Time (s)')

for plugin_name, (x_data, y_data) in data.items():
for plugin_name, (timings) in data.items():
x_data = []
y_data = []
for start_time, duration in timings:
x_data.append(start_time)
y_data.append(duration)
ax1.scatter(x_data, y_data, label=plugin_name)

ax1.set_xlim(0, ax1.get_xlim()[1])
Expand Down
4 changes: 4 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ main_loop-log_main_loop =
main_loop-log_memory =
pympler
matplotlib
main_loop-log_db =
sqlparse
report-timings =
pandas==1.*
matplotlib
Expand Down Expand Up @@ -131,6 +133,7 @@ all =
%(empy)s
%(graph)s
%(main_loop-log_data_store)s
%(main_loop-log_db)s
%(main_loop-log_main_loop)s
%(main_loop-log_memory)s
%(report-timings)s
Expand Down Expand Up @@ -195,6 +198,7 @@ cylc.main_loop =
health_check = cylc.flow.main_loop.health_check
auto_restart = cylc.flow.main_loop.auto_restart
log_data_store = cylc.flow.main_loop.log_data_store [main_loop-log_data_store]
log_db = cylc.flow.main_loop.log_db [main_loop-log_db]
log_main_loop = cylc.flow.main_loop.log_main_loop [main_loop-log_main_loop]
log_memory = cylc.flow.main_loop.log_memory [main_loop-log_memory]
reset_bad_hosts = cylc.flow.main_loop.reset_bad_hosts
Expand Down
72 changes: 72 additions & 0 deletions tests/functional/rnd/05-main-loop.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env bash
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Test the development main-loop plugins to ensure they can run to completion

. "$(dirname "$0")/test_header"

expected_log_files=(
# these are the files they should produce
cylc.flow.main_loop.log_data_store.json
cylc.flow.main_loop.log_data_store.pdf
cylc.flow.main_loop.log_db.sql
cylc.flow.main_loop.log_main_loop.json
cylc.flow.main_loop.log_main_loop.pdf
cylc.flow.main_loop.log_memory.json
cylc.flow.main_loop.log_memory.pdf
)

set_test_number $(( 1 + ${#expected_log_files[@]} ))

init_workflow "${TEST_NAME_BASE}" <<__FLOW_CYLC__
[scheduler]
# make sure periodic plugins actually run
[[main loop]]
[[[log data store]]]
interval = PT1S
[[[log main loop]]]
interval = PT1S
[[[log memory]]]
interval = PT1S

[scheduling]
[[graph]]
R1 = a

[runtime]
[[a]]
script = sleep 5
__FLOW_CYLC__

# run a workflow with all the development main-loop plugins turned on
run_ok "${TEST_NAME_BASE}-run" \
cylc play "${WORKFLOW_NAME}" \
-n \
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
--debug \
--main-loop 'log data store' \
--main-loop 'log db' \
--main-loop 'log main loop' \
--main-loop 'log memory'

# check the expected files are generated
for log_file in "${expected_log_files[@]}"; do
file_path="${HOME}/cylc-run/${WORKFLOW_NAME}/${log_file}"
run_ok "${TEST_NAME_BASE}.${log_file}" \
stat "${file_path}"
done

purge
41 changes: 41 additions & 0 deletions tests/unit/main_loop/test_log_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from textwrap import dedent

from cylc.flow.main_loop.log_db import _format


def test_format():
"""It should indent, fix case and strip comments in SQL statements."""
assert _format('''
select a, b, c, d, e, f, g
from table_1 left join table_2
where a = 1 and b = 2 and c = 3
# whatever
''') == dedent('''
SELECT a,
b,
c,
d,
e,
f,
g
FROM table_1
LEFT JOIN table_2
WHERE a = 1
AND b = 2
AND c = 3
'''[1:])
Loading