From e5cdbf582179938c51e13105f2b92120c54f69b0 Mon Sep 17 00:00:00 2001 From: Nicolas Schweitzer Date: Thu, 9 Jan 2025 14:14:29 +0100 Subject: [PATCH] feat(notify): [agent6] Prevent multiple messages (#32798) --- tasks/libs/pipeline_notifications.py | 54 ----------------------- tasks/libs/types.py | 12 ----- tasks/notify.py | 66 ++++++---------------------- 3 files changed, 13 insertions(+), 119 deletions(-) diff --git a/tasks/libs/pipeline_notifications.py b/tasks/libs/pipeline_notifications.py index 071eb69f02322..4e64e58bdb170 100644 --- a/tasks/libs/pipeline_notifications.py +++ b/tasks/libs/pipeline_notifications.py @@ -1,16 +1,11 @@ -import json import os import pathlib import re import subprocess -from collections import defaultdict from typing import Dict import yaml -from tasks.libs.common.gitlab import Gitlab, get_gitlab_token -from tasks.libs.types import FailedJobs, Test - def load_and_validate(file_name: str, default_placeholder: str, default_value: str) -> Dict[str, str]: p = pathlib.Path(os.path.realpath(__file__)).parent.joinpath(file_name) @@ -56,55 +51,6 @@ def check_for_missing_owners_slack_and_jira(print_missing_teams=True, owners_fil return error -def get_failed_tests(project_name, job, owners_file=".github/CODEOWNERS"): - gitlab = Gitlab(project_name=project_name, api_token=get_gitlab_token()) - owners = read_owners(owners_file) - try: - test_output = gitlab.artifact(job["id"], "test_output.json", ignore_not_found=True) - except Exception as e: - print("Ignoring test fetching error", e) - test_output = None - failed_tests = {} # type: dict[tuple[str, str], Test] - if test_output: - for line in test_output.iter_lines(): - json_test = json.loads(line) - if 'Test' in json_test: - name = json_test['Test'] - package = json_test['Package'] - action = json_test["Action"] - - if action == "fail": - # Ignore subtests, only the parent test should be reported for now - # to avoid multiple reports on the same test - # NTH: maybe the Test object should be more flexible to incorporate - # subtests? This would require some postprocessing of the Test objects - # we yield here to merge child Test objects with their parents. - if '/' in name: # Subtests have a name of the form "Test/Subtest" - continue - failed_tests[(package, name)] = Test(owners, name, package) - elif action == "pass" and (package, name) in failed_tests: - print(f"Test {name} from package {package} passed after retry, removing from output") - del failed_tests[(package, name)] - - return failed_tests.values() - - -def find_job_owners(failed_jobs: FailedJobs, owners_file: str = ".gitlab/JOBOWNERS") -> Dict[str, FailedJobs]: - owners = read_owners(owners_file) - owners_to_notify = defaultdict(FailedJobs) - - for job in failed_jobs.all_non_infra_failures(): - job_owners = owners.of(job["name"]) - # job_owners is a list of tuples containing the type of owner (eg. USERNAME, TEAM) and the name of the owner - # eg. [('TEAM', '@DataDog/agent-ci-experience')] - - for kind, owner in job_owners: - if kind == "TEAM": - owners_to_notify[owner].add_failed_job(job) - - return owners_to_notify - - def base_message(header, state): project_title = os.getenv("CI_PROJECT_TITLE") # commit_title needs a default string value, otherwise the re.search line below crashes diff --git a/tasks/libs/types.py b/tasks/libs/types.py index 089ddf65de5ea..8e169bfdea3ac 100644 --- a/tasks/libs/types.py +++ b/tasks/libs/types.py @@ -69,9 +69,6 @@ def add_failed_job(self, job): else: self.mandatory_job_failures.append(job) - def all_non_infra_failures(self): - return self.mandatory_job_failures + self.optional_job_failures - def all_mandatory_failures(self): return self.mandatory_job_failures + self.mandatory_infra_job_failures @@ -99,9 +96,6 @@ def __init__(self, base: str = "", jobs: FailedJobs = None): self.failed_tests = defaultdict(list) self.coda = "" - def add_test_failure(self, test, job): - self.failed_tests[test.key].append(job) - def __render_jobs_section(self, header: str, jobs: list, buffer: io.StringIO): if not jobs: return @@ -164,9 +158,3 @@ def __str__(self): if self.coda: print(self.coda, file=buffer) return buffer.getvalue() - - -class TeamMessage(SlackMessage): - JOBS_SECTION_HEADER = "Failed jobs you own:" - OPTIONAL_JOBS_SECTION_HEADER = "Failed jobs (allowed to fail) you own:" - TEST_SECTION_HEADER = "Failed unit tests you own:" diff --git a/tasks/notify.py b/tasks/notify.py index ebaa00c612f96..2665b6012bd3d 100644 --- a/tasks/notify.py +++ b/tasks/notify.py @@ -4,24 +4,16 @@ import re import tempfile import traceback -from collections import defaultdict from datetime import datetime -from typing import Dict from invoke import task from invoke.exceptions import Exit, UnexpectedExit from tasks.libs.datadog_api import create_count, send_metrics from tasks.libs.pipeline_data import get_failed_jobs -from tasks.libs.pipeline_notifications import ( - base_message, - check_for_missing_owners_slack_and_jira, - find_job_owners, - get_failed_tests, - send_slack_message, -) +from tasks.libs.pipeline_notifications import base_message, check_for_missing_owners_slack_and_jira, send_slack_message from tasks.libs.pipeline_stats import get_failed_jobs_stats -from tasks.libs.types import FailedJobs, SlackMessage, TeamMessage +from tasks.libs.types import SlackMessage PROJECT_NAME = "DataDog/datadog-agent" AWS_S3_CP_CMD = "aws s3 cp --only-show-errors --region us-east-1 --sse AES256" @@ -53,7 +45,7 @@ def send_message(_, notification_type="merge", print_to_stdout=False): """ try: failed_jobs = get_failed_jobs(PROJECT_NAME, os.getenv("CI_PIPELINE_ID")) - messages_to_send = generate_failure_messages(PROJECT_NAME, failed_jobs) + message = SlackMessage(jobs=failed_jobs) except Exception as e: buffer = io.StringIO() print(base_message("datadog-agent", "is in an unknown state"), file=buffer) @@ -61,9 +53,7 @@ def send_message(_, notification_type="merge", print_to_stdout=False): traceback.print_exc(limit=-1, file=buffer) print("See the notify job log for the full exception traceback.", file=buffer) - messages_to_send = { - "@DataDog/agent-all": SlackMessage(base=buffer.getvalue()), - } + message = (SlackMessage(base=buffer.getvalue()),) # Print traceback on job log print(e) traceback.print_exc() @@ -87,16 +77,15 @@ def send_message(_, notification_type="merge", print_to_stdout=False): header = f"{header_icon} :rocket: datadog-agent deploy" base = base_message(header, state) - # Send messages - for _, message in messages_to_send.items(): - # We don't generate one message per owner, only the global one - channel = "#agent-agent6-ops" - message.base_message = base - message.coda = coda - if print_to_stdout: - print(f"Would send to {channel}:\n{str(message)}") - else: - send_slack_message(channel, str(message)) # TODO: use channel variable + # Send message + # We don't generate one message per owner, only the global one + channel = "#agent-agent6-ops" + message.base_message = base + message.coda = coda + if print_to_stdout: + print(f"Would send to {channel}:\n{str(message)}") + else: + send_slack_message(channel, str(message)) # TODO: use channel variable @task @@ -169,35 +158,6 @@ def send_stats(_, print_to_stdout=False): print(f"Would send: {series}") -# Tasks to trigger pipeline notifications - - -def generate_failure_messages(project_name: str, failed_jobs: FailedJobs) -> Dict[str, SlackMessage]: - all_teams = "@DataDog/agent-all" - - # Generate messages for each team - messages_to_send = defaultdict(TeamMessage) - messages_to_send[all_teams] = SlackMessage(jobs=failed_jobs) - - failed_job_owners = find_job_owners(failed_jobs) - for owner, jobs in failed_job_owners.items(): - if owner == "@DataDog/multiple": - for job in jobs.all_non_infra_failures(): - for test in get_failed_tests(project_name, job): - messages_to_send[all_teams].add_test_failure(test, job) - elif owner == "@DataDog/do-not-notify": - # Jobs owned by @DataDog/do-not-notify do not send team messages - pass - elif owner == all_teams: - # Jobs owned by @DataDog/agent-all will already be in the global - # message, do not overwrite the failed jobs list - pass - else: - messages_to_send[owner].failed_jobs = jobs - - return messages_to_send - - @task def check_consistent_failures(ctx, job_failures_file="job_executions.json"): # Retrieve the stored document in aws s3. It has the following format: