Skip to content

Commit

Permalink
[SARC-328] Implémenter les alertes : Nombre de jobs CPU/GPU (actives …
Browse files Browse the repository at this point in the history
…ou inactives) sur un cluster sur une période X (#128)

* [SARC-328] Implémenter les alertes : Nombre de jobs CPU/GPU (actives ou inactives) sur un cluster sur une période X

* Rebase and update comments.

* Fix a column name

* Select sub-dataframe with given cluster names to compute stats, then use full dataframe to check warnings
Add supplementary tests

* - Rename files
- Remove `exclude` parameter and use only `cluster_names` for both adding and excluding clusters from checking.

* Compute statistics for each cluster separately.

* Use file_regression for tests.
  • Loading branch information
notoraptor authored Sep 18, 2024
1 parent 6b6c0f2 commit 0e6ba5c
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ disable = [
"invalid-name",
"no-else-return", # Bad rule IMO (- OB)
"line-too-long", # Black takes care of line length.
"logging-fstring-interpolation"
"logging-fstring-interpolation",
"duplicate-code",
]
extension-pkg-whitelist = "pydantic"

Expand Down
129 changes: 129 additions & 0 deletions sarc/alerts/usage_alerts/cluster_scraping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import logging
import sys
from datetime import datetime, timedelta
from typing import List, Optional

import pandas

from sarc.config import MTL
from sarc.jobs.series import compute_time_frames, load_job_series

logger = logging.getLogger(__name__)


def check_nb_jobs_per_cluster_per_time(
time_interval: Optional[timedelta] = timedelta(days=7),
time_unit=timedelta(days=1),
cluster_names: Optional[List[str]] = None,
nb_stddev=2,
verbose=False,
):
"""
Check if we have scraped enough jobs per time unit per cluster on given time interval.
Log a warning for each cluster where number of jobs per time unit is lower than a limit
computed using mean and standard deviation statistics from this cluster.
Parameters
----------
time_interval: timedelta
If given, only jobs which ran in [now - time_interval, time_interval] will be used for checking.
Default is last 7 days.
If None, all jobs are used.
time_unit: timedelta
Time unit in which we must check cluster usage through time_interval. Default is 1 day.
cluster_names: list
Optional list of clusters to check.
If empty (or not specified), use all clusters available among jobs retrieved with time_interval.
nb_stddev: int
Amount of standard deviation to remove from average statistics to compute checking threshold.
For each cluster, threshold is computed as:
max(0, average - nb_stddev * stddev)
verbose: bool
If True, print supplementary info about clusters statistics.
"""

# Parse time_interval
start, end, clip_time = None, None, False
if time_interval is not None:
end = datetime.now(tz=MTL)
start = end - time_interval
clip_time = True

# Get data frame
df = load_job_series(start=start, end=end, clip_time=clip_time)

# Split data frame into time frames using `time_unit`
tf = compute_time_frames(df, frame_size=time_unit)

# List all available timestamps.
# We will check each timestamp for each cluster.
timestamps = sorted(tf["timestamp"].unique())

# List clusters
if cluster_names:
cluster_names = sorted(cluster_names)
else:
cluster_names = sorted(df["cluster_name"].unique())

# Iter for each cluster.
for cluster_name in cluster_names:
# Select only jobs for current cluster,
# group jobs by timestamp, and count jobs for each timestamp.
f_stats = (
tf[tf["cluster_name"] == cluster_name]
.groupby(["timestamp"])[["job_id"]]
.count()
)

# Create a dataframe with all available timestamps
# and associate each timestamp to 0 jobs by default.
c = (
pandas.DataFrame({"timestamp": timestamps, "count": [0] * len(timestamps)})
.groupby(["timestamp"])[["count"]]
.sum()
)
# Set each timestamp valid for this cluster with real number of jobs scraped in this timestamp.
c.loc[f_stats.index, "count"] = f_stats["job_id"]

# We now have number of jobs for each timestamp for this cluster,
# with count 0 for timestamps where no jobs run on cluster,

# Compute average number of jobs per timestamp for this cluster
avg = c["count"].mean()
# Compute standard deviation of job count per timestamp for this cluster
stddev = c["count"].std()
# Compute threshold to use for warnings: <average> - nb_stddev * <standard deviation>
threshold = max(0, avg - nb_stddev * stddev)

if verbose:
print(f"[{cluster_name}]", file=sys.stderr)
print(c, file=sys.stderr)
print(f"avg {avg}, stddev {stddev}, threshold {threshold}", file=sys.stderr)
print(file=sys.stderr)

if threshold == 0:
# If threshold is zero, no check can be done, as jobs count will be always >= 0.
# Instead, we log a general warning.
msg = f"[{cluster_name}] threshold 0 ({avg} - {nb_stddev} * {stddev})."
if len(timestamps) == 1:
msg += (
f" Only 1 timestamp found. Either time_interval ({time_interval}) is too short, "
f"or this cluster should not be currently checked"
)
else:
msg += (
f" Either nb_stddev is too high, time_interval ({time_interval}) is too short, "
f"or this cluster should not be currently checked"
)
logger.warning(msg)
else:
# With a non-null threshold, we can check each timestamp.
for timestamp in timestamps:
nb_jobs = c.loc[timestamp]["count"]
if nb_jobs < threshold:
logger.warning(
f"[{cluster_name}][{timestamp}] "
f"insufficient cluster scraping: {nb_jobs} jobs / cluster / time unit; "
f"minimum required for this cluster: {threshold} ({avg} - {nb_stddev} * {stddev}); "
f"time unit: {time_unit}"
)
58 changes: 58 additions & 0 deletions tests/functional/usage_alerts/test_alert_cluster_scraping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import functools
import re

import pytest

from sarc.alerts.usage_alerts.cluster_scraping import check_nb_jobs_per_cluster_per_time

from ..jobs.test_func_load_job_series import MOCK_TIME
from .common import _get_warnings

get_warnings = functools.partial(
_get_warnings,
module="sarc.alerts.usage_alerts.cluster_scraping:cluster_scraping.py",
)


@pytest.mark.freeze_time(MOCK_TIME)
@pytest.mark.usefixtures("read_only_db", "tzlocal_is_mtl")
@pytest.mark.parametrize(
"params",
[
# Check with default params. In last 7 days from now (mock time: 2023-11-22),
# there is only 2 jobs from 1 cluster in 1 timestamp. So, threshold will be 0.
dict(verbose=True),
# Check with no time interval (i.e. all jobs).
dict(time_interval=None, verbose=True),
# Check with a supplementary cluster `another_cluster` which is not in data frame.
dict(
time_interval=None,
cluster_names=[
"fromage",
"mila",
"patate",
"raisin",
"another_cluster",
],
verbose=True,
),
# Check above case with 2 clusters ignored.
dict(
time_interval=None,
cluster_names=[
"mila",
"raisin",
"another_cluster",
],
),
],
)
def test_check_nb_jobs_per_cluster_per_time(params, capsys, caplog, file_regression):
check_nb_jobs_per_cluster_per_time(**params)
file_regression.check(
re.sub(
r"WARNING +sarc\.alerts\.usage_alerts\.cluster_scraping:cluster_scraping.py:[0-9]+ +",
"",
f"{capsys.readouterr().err}\n{caplog.text}",
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
load job series: 0%| | 0/2 [00:00<?, ?it/s]load job series: 100%|██████████| 2/2 [00:00<?, ?it/s]
[raisin]
count
timestamp
2023-11-21 07:00:00-05:00 2
avg 2.0, stddev nan, threshold 0


[raisin] threshold 0 (2.0 - 2 * nan). Only 1 timestamp found. Either time_interval (7 days, 0:00:00) is too short, or this cluster should not be currently checked
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
load job series: 0%| | 0/24 [00:00<?, ?it/s]load job series: 100%|██████████| 24/24 [00:00<?, ?it/s]
[fromage]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 1
2023-02-18 00:01:00-05:00 0
2023-02-19 00:01:00-05:00 0
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 0
avg 0.125, stddev 0.3535533905932738, threshold 0

[mila]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 0
2023-02-18 00:01:00-05:00 1
2023-02-19 00:01:00-05:00 1
2023-02-20 00:01:00-05:00 1
2023-11-21 00:01:00-05:00 0
avg 0.375, stddev 0.5175491695067657, threshold 0

[patate]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 1
2023-02-18 00:01:00-05:00 1
2023-02-19 00:01:00-05:00 0
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 0
avg 0.25, stddev 0.4629100498862757, threshold 0

[raisin]
count
timestamp
2023-02-14 00:01:00-05:00 4
2023-02-15 00:01:00-05:00 3
2023-02-16 00:01:00-05:00 4
2023-02-17 00:01:00-05:00 3
2023-02-18 00:01:00-05:00 3
2023-02-19 00:01:00-05:00 4
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 2
avg 2.875, stddev 1.3562026818605375, threshold 0.162594636278925


[fromage] threshold 0 (0.125 - 2 * 0.3535533905932738). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[mila] threshold 0 (0.375 - 2 * 0.5175491695067657). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[patate] threshold 0 (0.25 - 2 * 0.4629100498862757). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[raisin][2023-02-20 00:01:00-05:00] insufficient cluster scraping: 0 jobs / cluster / time unit; minimum required for this cluster: 0.162594636278925 (2.875 - 2 * 1.3562026818605375); time unit: 1 day, 0:00:00
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
load job series: 0%| | 0/24 [00:00<?, ?it/s]load job series: 100%|██████████| 24/24 [00:00<?, ?it/s]
[another_cluster]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 0
2023-02-18 00:01:00-05:00 0
2023-02-19 00:01:00-05:00 0
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 0
avg 0.0, stddev 0.0, threshold 0

[fromage]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 1
2023-02-18 00:01:00-05:00 0
2023-02-19 00:01:00-05:00 0
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 0
avg 0.125, stddev 0.3535533905932738, threshold 0

[mila]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 0
2023-02-18 00:01:00-05:00 1
2023-02-19 00:01:00-05:00 1
2023-02-20 00:01:00-05:00 1
2023-11-21 00:01:00-05:00 0
avg 0.375, stddev 0.5175491695067657, threshold 0

[patate]
count
timestamp
2023-02-14 00:01:00-05:00 0
2023-02-15 00:01:00-05:00 0
2023-02-16 00:01:00-05:00 0
2023-02-17 00:01:00-05:00 1
2023-02-18 00:01:00-05:00 1
2023-02-19 00:01:00-05:00 0
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 0
avg 0.25, stddev 0.4629100498862757, threshold 0

[raisin]
count
timestamp
2023-02-14 00:01:00-05:00 4
2023-02-15 00:01:00-05:00 3
2023-02-16 00:01:00-05:00 4
2023-02-17 00:01:00-05:00 3
2023-02-18 00:01:00-05:00 3
2023-02-19 00:01:00-05:00 4
2023-02-20 00:01:00-05:00 0
2023-11-21 00:01:00-05:00 2
avg 2.875, stddev 1.3562026818605375, threshold 0.162594636278925


[another_cluster] threshold 0 (0.0 - 2 * 0.0). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[fromage] threshold 0 (0.125 - 2 * 0.3535533905932738). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[mila] threshold 0 (0.375 - 2 * 0.5175491695067657). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[patate] threshold 0 (0.25 - 2 * 0.4629100498862757). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[raisin][2023-02-20 00:01:00-05:00] insufficient cluster scraping: 0 jobs / cluster / time unit; minimum required for this cluster: 0.162594636278925 (2.875 - 2 * 1.3562026818605375); time unit: 1 day, 0:00:00
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
load job series: 0%| | 0/24 [00:00<?, ?it/s]load job series: 100%|██████████| 24/24 [00:00<?, ?it/s]

[another_cluster] threshold 0 (0.0 - 2 * 0.0). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[mila] threshold 0 (0.375 - 2 * 0.5175491695067657). Either nb_stddev is too high, time_interval (None) is too short, or this cluster should not be currently checked
[raisin][2023-02-20 00:01:00-05:00] insufficient cluster scraping: 0 jobs / cluster / time unit; minimum required for this cluster: 0.162594636278925 (2.875 - 2 * 1.3562026818605375); time unit: 1 day, 0:00:00
Expand Down

0 comments on commit 0e6ba5c

Please sign in to comment.