-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
main loop: fix log_main_loop and add log_db plugin #4674
Changes from all commits
c339913
ecb43d3
7d61274
91e7c97
2ca2b2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,3 +27,4 @@ dependencies: | |
#- pandas >=1.0,<2 | ||
#- pympler | ||
#- matplotlib-base | ||
#- sqlparse |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. | ||
# Copyright (C) NIWA & British Crown (Met Office) & Contributors. | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
"""Log all database transactions. | ||
|
||
.. note:: | ||
|
||
This plugin is for Cylc developers debugging database issues. | ||
|
||
Writes an SQL file into the workflow run directory on shutown. | ||
|
||
""" | ||
|
||
import logging | ||
from logging.handlers import RotatingFileHandler | ||
from pathlib import Path | ||
|
||
import sqlparse | ||
|
||
from cylc.flow import CYLC_LOG | ||
from cylc.flow.main_loop import (startup, shutdown) | ||
|
||
|
||
DB_LOG = logging.getLogger(f'{CYLC_LOG}-db') | ||
|
||
|
||
def _format(sql_string): | ||
"""Pretty print an SQL statement.""" | ||
wxtim marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return '\n'.join( | ||
sqlparse.format( | ||
statement, | ||
reindent_aligned=True, | ||
use_space_around_operators=True, | ||
strip_comments=True, | ||
keyword_case='upper', | ||
identifier_case='lower', | ||
) | ||
for statement in sqlparse.split(sql_string) | ||
) + '\n' | ||
|
||
|
||
def _log(sql_string): | ||
"""Log a SQL string.""" | ||
DB_LOG.info(_format(sql_string)) | ||
|
||
|
||
def _patch_db_connect(db_connect_method): | ||
"""Patch the workflow DAO to configure logging. | ||
|
||
We patch the connect method so that any subsequent re-connections | ||
are also patched. | ||
""" | ||
def _inner(*args, **kwargs): | ||
nonlocal db_connect_method | ||
conn = db_connect_method(*args, **kwargs) | ||
conn.set_trace_callback(_log) | ||
return conn | ||
return _inner | ||
|
||
|
||
@startup | ||
async def init(scheduler, state): | ||
# configure log handler | ||
DB_LOG.setLevel(logging.INFO) | ||
handler = RotatingFileHandler( | ||
str(Path(scheduler.workflow_run_dir, f'{__name__}.sql')), | ||
maxBytes=1000000, | ||
) | ||
state['log_handler'] = handler | ||
DB_LOG.addHandler(handler) | ||
|
||
# configure the DB manager to log all DB operations | ||
scheduler.workflow_db_mgr.pri_dao.connect = _patch_db_connect( | ||
scheduler.workflow_db_mgr.pri_dao.connect | ||
) | ||
|
||
|
||
@shutdown | ||
async def stop(scheduler, state): | ||
handler = state.get('log_handler') | ||
if handler: | ||
handler.close() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ | |
import json | ||
from pathlib import Path | ||
|
||
from cylc.flow.main_loop import (startup, shutdown) | ||
from cylc.flow.main_loop import startup, shutdown | ||
|
||
try: | ||
import matplotlib | ||
|
@@ -40,47 +40,45 @@ | |
|
||
@startup | ||
async def init(scheduler, _): | ||
"""Patch the timings for each plugin to use an unlimited deque.""" | ||
for state in scheduler.main_loop_plugins['state'].values(): | ||
state['timings'] = deque() | ||
"""Override default queue length of 1. | ||
|
||
This allows timings to accumulate, normally only the most recent is kept. | ||
""" | ||
plugins = scheduler.main_loop_plugins | ||
for plugin in plugins['timings']: | ||
plugins['timings'][plugin] = deque() | ||
|
||
|
||
@shutdown | ||
async def report(scheduler, _): | ||
"""Extract plugin function timings.""" | ||
data = _transpose(scheduler.main_loop_plugins['state']) | ||
data = _normalise(data) | ||
_plot(data, scheduler.workflow_run_dir) | ||
|
||
|
||
def _transpose(all_states): | ||
return { | ||
plugin: tuple(zip(*state['timings'])) | ||
for plugin, state | ||
in all_states.items() | ||
if state['timings'] | ||
} | ||
data = scheduler.main_loop_plugins['timings'] | ||
if data: | ||
data = _normalise(data) | ||
_dump(data, scheduler.workflow_run_dir) | ||
_plot(data, scheduler.workflow_run_dir) | ||
|
||
|
||
def _normalise(data): | ||
earliest_time = min(( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would I be unreasonable in asking for docstrings for these functions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the meaning of "normalise" supposed to be intuitively obvious, without reading the code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see the unit test does have a docstring 😁 def test_normalise(test_data):
"""Ensure we correctly normalise the timings against the earliest time.""" |
||
time | ||
for _, (times, _) in data.items() | ||
for time in times | ||
start_time | ||
for _, timings in data.items() | ||
for start_time, duration in timings | ||
)) | ||
return { | ||
plugin_name: ( | ||
tuple((time - earliest_time for time in times)), | ||
y_data | ||
) | ||
for plugin_name, (times, y_data) in data.items() | ||
plugin_name: [ | ||
(start_time - earliest_time, duration) | ||
for start_time, duration in timings | ||
] | ||
for (plugin_name, _), timings in data.items() | ||
} | ||
|
||
|
||
def _dump(data, path): | ||
json.dump( | ||
data, | ||
Path(path, f'{__name__}.json').open('w+') | ||
Path(path, f'{__name__}.json').open('w+'), | ||
indent=4 | ||
) | ||
return True | ||
|
||
|
@@ -93,7 +91,12 @@ def _plot(data, path): | |
ax1.set_xlabel('Workflow Run Time (s)') | ||
ax1.set_ylabel('XTrigger Run Time (s)') | ||
|
||
for plugin_name, (x_data, y_data) in data.items(): | ||
for plugin_name, (timings) in data.items(): | ||
x_data = [] | ||
y_data = [] | ||
for start_time, duration in timings: | ||
x_data.append(start_time) | ||
y_data.append(duration) | ||
ax1.scatter(x_data, y_data, label=plugin_name) | ||
|
||
ax1.set_xlim(0, ax1.get_xlim()[1]) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
#!/usr/bin/env bash | ||
oliver-sanders marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. | ||
# Copyright (C) NIWA & British Crown (Met Office) & Contributors. | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
# Test the development main-loop plugins to ensure they can run to completion | ||
|
||
. "$(dirname "$0")/test_header" | ||
|
||
expected_log_files=( | ||
# these are the files they should produce | ||
cylc.flow.main_loop.log_data_store.json | ||
cylc.flow.main_loop.log_data_store.pdf | ||
cylc.flow.main_loop.log_db.sql | ||
cylc.flow.main_loop.log_main_loop.json | ||
cylc.flow.main_loop.log_main_loop.pdf | ||
cylc.flow.main_loop.log_memory.json | ||
cylc.flow.main_loop.log_memory.pdf | ||
) | ||
|
||
set_test_number $(( 1 + ${#expected_log_files[@]} )) | ||
|
||
init_workflow "${TEST_NAME_BASE}" <<__FLOW_CYLC__ | ||
[scheduler] | ||
# make sure periodic plugins actually run | ||
[[main loop]] | ||
[[[log data store]]] | ||
interval = PT1S | ||
[[[log main loop]]] | ||
interval = PT1S | ||
[[[log memory]]] | ||
interval = PT1S | ||
|
||
[scheduling] | ||
[[graph]] | ||
R1 = a | ||
|
||
[runtime] | ||
[[a]] | ||
script = sleep 5 | ||
__FLOW_CYLC__ | ||
|
||
# run a workflow with all the development main-loop plugins turned on | ||
run_ok "${TEST_NAME_BASE}-run" \ | ||
cylc play "${WORKFLOW_NAME}" \ | ||
-n \ | ||
oliver-sanders marked this conversation as resolved.
Show resolved
Hide resolved
|
||
--debug \ | ||
--main-loop 'log data store' \ | ||
--main-loop 'log db' \ | ||
--main-loop 'log main loop' \ | ||
--main-loop 'log memory' | ||
|
||
# check the expected files are generated | ||
for log_file in "${expected_log_files[@]}"; do | ||
file_path="${HOME}/cylc-run/${WORKFLOW_NAME}/${log_file}" | ||
run_ok "${TEST_NAME_BASE}.${log_file}" \ | ||
stat "${file_path}" | ||
done | ||
|
||
purge |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. | ||
# Copyright (C) NIWA & British Crown (Met Office) & Contributors. | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
from textwrap import dedent | ||
|
||
from cylc.flow.main_loop.log_db import _format | ||
|
||
|
||
def test_format(): | ||
"""It should indent, fix case and strip comments in SQL statements.""" | ||
assert _format(''' | ||
select a, b, c, d, e, f, g | ||
from table_1 left join table_2 | ||
where a = 1 and b = 2 and c = 3 | ||
# whatever | ||
''') == dedent(''' | ||
SELECT a, | ||
b, | ||
c, | ||
d, | ||
e, | ||
f, | ||
g | ||
FROM table_1 | ||
LEFT JOIN table_2 | ||
WHERE a = 1 | ||
AND b = 2 | ||
AND c = 3 | ||
'''[1:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to use
actions/setup-python
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think so, this action doesn't use Python?