Skip to content

Commit

Permalink
feat(notify): [agent6] Prevent multiple messages (#32798)
Browse files Browse the repository at this point in the history
  • Loading branch information
chouetz authored Jan 9, 2025
1 parent 70ecd2b commit e5cdbf5
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 119 deletions.
54 changes: 0 additions & 54 deletions tasks/libs/pipeline_notifications.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import json
import os
import pathlib
import re
import subprocess
from collections import defaultdict
from typing import Dict

import yaml

from tasks.libs.common.gitlab import Gitlab, get_gitlab_token
from tasks.libs.types import FailedJobs, Test


def load_and_validate(file_name: str, default_placeholder: str, default_value: str) -> Dict[str, str]:
p = pathlib.Path(os.path.realpath(__file__)).parent.joinpath(file_name)
Expand Down Expand Up @@ -56,55 +51,6 @@ def check_for_missing_owners_slack_and_jira(print_missing_teams=True, owners_fil
return error


def get_failed_tests(project_name, job, owners_file=".github/CODEOWNERS"):
gitlab = Gitlab(project_name=project_name, api_token=get_gitlab_token())
owners = read_owners(owners_file)
try:
test_output = gitlab.artifact(job["id"], "test_output.json", ignore_not_found=True)
except Exception as e:
print("Ignoring test fetching error", e)
test_output = None
failed_tests = {} # type: dict[tuple[str, str], Test]
if test_output:
for line in test_output.iter_lines():
json_test = json.loads(line)
if 'Test' in json_test:
name = json_test['Test']
package = json_test['Package']
action = json_test["Action"]

if action == "fail":
# Ignore subtests, only the parent test should be reported for now
# to avoid multiple reports on the same test
# NTH: maybe the Test object should be more flexible to incorporate
# subtests? This would require some postprocessing of the Test objects
# we yield here to merge child Test objects with their parents.
if '/' in name: # Subtests have a name of the form "Test/Subtest"
continue
failed_tests[(package, name)] = Test(owners, name, package)
elif action == "pass" and (package, name) in failed_tests:
print(f"Test {name} from package {package} passed after retry, removing from output")
del failed_tests[(package, name)]

return failed_tests.values()


def find_job_owners(failed_jobs: FailedJobs, owners_file: str = ".gitlab/JOBOWNERS") -> Dict[str, FailedJobs]:
owners = read_owners(owners_file)
owners_to_notify = defaultdict(FailedJobs)

for job in failed_jobs.all_non_infra_failures():
job_owners = owners.of(job["name"])
# job_owners is a list of tuples containing the type of owner (eg. USERNAME, TEAM) and the name of the owner
# eg. [('TEAM', '@DataDog/agent-ci-experience')]

for kind, owner in job_owners:
if kind == "TEAM":
owners_to_notify[owner].add_failed_job(job)

return owners_to_notify


def base_message(header, state):
project_title = os.getenv("CI_PROJECT_TITLE")
# commit_title needs a default string value, otherwise the re.search line below crashes
Expand Down
12 changes: 0 additions & 12 deletions tasks/libs/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ def add_failed_job(self, job):
else:
self.mandatory_job_failures.append(job)

def all_non_infra_failures(self):
return self.mandatory_job_failures + self.optional_job_failures

def all_mandatory_failures(self):
return self.mandatory_job_failures + self.mandatory_infra_job_failures

Expand Down Expand Up @@ -99,9 +96,6 @@ def __init__(self, base: str = "", jobs: FailedJobs = None):
self.failed_tests = defaultdict(list)
self.coda = ""

def add_test_failure(self, test, job):
self.failed_tests[test.key].append(job)

def __render_jobs_section(self, header: str, jobs: list, buffer: io.StringIO):
if not jobs:
return
Expand Down Expand Up @@ -164,9 +158,3 @@ def __str__(self):
if self.coda:
print(self.coda, file=buffer)
return buffer.getvalue()


class TeamMessage(SlackMessage):
JOBS_SECTION_HEADER = "Failed jobs you own:"
OPTIONAL_JOBS_SECTION_HEADER = "Failed jobs (allowed to fail) you own:"
TEST_SECTION_HEADER = "Failed unit tests you own:"
66 changes: 13 additions & 53 deletions tasks/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,16 @@
import re
import tempfile
import traceback
from collections import defaultdict
from datetime import datetime
from typing import Dict

from invoke import task
from invoke.exceptions import Exit, UnexpectedExit

from tasks.libs.datadog_api import create_count, send_metrics
from tasks.libs.pipeline_data import get_failed_jobs
from tasks.libs.pipeline_notifications import (
base_message,
check_for_missing_owners_slack_and_jira,
find_job_owners,
get_failed_tests,
send_slack_message,
)
from tasks.libs.pipeline_notifications import base_message, check_for_missing_owners_slack_and_jira, send_slack_message
from tasks.libs.pipeline_stats import get_failed_jobs_stats
from tasks.libs.types import FailedJobs, SlackMessage, TeamMessage
from tasks.libs.types import SlackMessage

PROJECT_NAME = "DataDog/datadog-agent"
AWS_S3_CP_CMD = "aws s3 cp --only-show-errors --region us-east-1 --sse AES256"
Expand Down Expand Up @@ -53,17 +45,15 @@ def send_message(_, notification_type="merge", print_to_stdout=False):
"""
try:
failed_jobs = get_failed_jobs(PROJECT_NAME, os.getenv("CI_PIPELINE_ID"))
messages_to_send = generate_failure_messages(PROJECT_NAME, failed_jobs)
message = SlackMessage(jobs=failed_jobs)
except Exception as e:
buffer = io.StringIO()
print(base_message("datadog-agent", "is in an unknown state"), file=buffer)
print("Found exception when generating notification:", file=buffer)
traceback.print_exc(limit=-1, file=buffer)
print("See the notify job log for the full exception traceback.", file=buffer)

messages_to_send = {
"@DataDog/agent-all": SlackMessage(base=buffer.getvalue()),
}
message = (SlackMessage(base=buffer.getvalue()),)
# Print traceback on job log
print(e)
traceback.print_exc()
Expand All @@ -87,16 +77,15 @@ def send_message(_, notification_type="merge", print_to_stdout=False):
header = f"{header_icon} :rocket: datadog-agent deploy"
base = base_message(header, state)

# Send messages
for _, message in messages_to_send.items():
# We don't generate one message per owner, only the global one
channel = "#agent-agent6-ops"
message.base_message = base
message.coda = coda
if print_to_stdout:
print(f"Would send to {channel}:\n{str(message)}")
else:
send_slack_message(channel, str(message)) # TODO: use channel variable
# Send message
# We don't generate one message per owner, only the global one
channel = "#agent-agent6-ops"
message.base_message = base
message.coda = coda
if print_to_stdout:
print(f"Would send to {channel}:\n{str(message)}")
else:
send_slack_message(channel, str(message)) # TODO: use channel variable


@task
Expand Down Expand Up @@ -169,35 +158,6 @@ def send_stats(_, print_to_stdout=False):
print(f"Would send: {series}")


# Tasks to trigger pipeline notifications


def generate_failure_messages(project_name: str, failed_jobs: FailedJobs) -> Dict[str, SlackMessage]:
all_teams = "@DataDog/agent-all"

# Generate messages for each team
messages_to_send = defaultdict(TeamMessage)
messages_to_send[all_teams] = SlackMessage(jobs=failed_jobs)

failed_job_owners = find_job_owners(failed_jobs)
for owner, jobs in failed_job_owners.items():
if owner == "@DataDog/multiple":
for job in jobs.all_non_infra_failures():
for test in get_failed_tests(project_name, job):
messages_to_send[all_teams].add_test_failure(test, job)
elif owner == "@DataDog/do-not-notify":
# Jobs owned by @DataDog/do-not-notify do not send team messages
pass
elif owner == all_teams:
# Jobs owned by @DataDog/agent-all will already be in the global
# message, do not overwrite the failed jobs list
pass
else:
messages_to_send[owner].failed_jobs = jobs

return messages_to_send


@task
def check_consistent_failures(ctx, job_failures_file="job_executions.json"):
# Retrieve the stored document in aws s3. It has the following format:
Expand Down

0 comments on commit e5cdbf5

Please sign in to comment.