Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup_ci needs to use different expiration times for auto deployed and E2E clusters #512

Merged
merged 2 commits into from
Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 126 additions & 81 deletions py/kubeflow/testing/cleanup_ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,47 @@
# See https://github.com/kubeflow/testing/issues/444
# We are switching to unique names for auto deployments
# So this matches the new ones.
AUTO_DEPLOY_PATTERN = re.compile(r"kf-vmaster-(?!n\d\d)")

# Regexes that select matching deployments
MATCHING = [re.compile("e2e-.*"), re.compile("kfctl.*"),
re.compile("z-.*"), re.compile(".*presubmit.*"),
AUTO_DEPLOY_PATTERN]

MATCHING_FIREWALL_RULES = [re.compile("gke-kfctl-.*"),
re.compile("gke-e2e-.*"),
re.compile(".*presubmit.*"),
re.compile(".*postsubmit.*"),
AUTO_DEPLOY_PATTERN]

# Regexes that select matching disks
MATCHING_DISK = [re.compile(".*jlewi.*"), re.compile(".*kfctl.*"),
re.compile(".*postsubmit.*"), re.compile(".*presubmit.*"),
AUTO_DEPLOY_PATTERN]

def is_match_disk(name):
for m in MATCHING_DISK:
if m.match(name):
return True

return False
AUTO_DEPLOY_PATTERNS = [re.compile(r"kf-vmaster-(?!n\d\d)")]

E2E_PATTERNS = [re.compile(".*e2e-.*"), re.compile(".*kfctl.*"),
re.compile(".*z-.*"), re.compile(".*presubmit.*")]

# Constants enumerating the different classes of infra
# We currently have 2 types
# Deployments created by E2E tests
# Auto deployments from master and release branches
AUTO_INFRA = "auto-deploy"
E2E_INFRA = "e2e"

# We use E2E OWNERLESS for resources that we currently don't know how
# to map back to a particular kubeflow deployment.
# In this case we can't determine whether its an E2E deployment or auto-deployed
# resource. So we we just associae a long max age with this.
E2E_OWNERLESS = "e2e-ownerless"

# Map the different classes of infra to max lifetimes.
MAX_LIFETIME = {
# Auto infra should live for 8-24 hours to facilitate debugging of examples
AUTO_INFRA: datetime.timedelta(days=2),
# E2E infra should be relatively short lived
E2E_INFRA: datetime.timedelta(hours=3),
# This should be larger than Auto_infra
E2E_OWNERLESS: datetime.timedelta(days=2),
}

def name_to_infra_type(name):
"""Given a name map it to the type of infrastructure."""

if is_match(name, patterns=AUTO_DEPLOY_PATTERNS):
return AUTO_INFRA

if is_match(name, patterns=E2E_PATTERNS):
return E2E_INFRA
return None

def is_match(name, patterns=None):
if not patterns:
patterns = MATCHING
patterns = E2E_PATTERNS
for m in patterns:
if m.match(name):
return True
Expand Down Expand Up @@ -127,12 +140,19 @@ def cleanup_endpoints(args):

for s in results["services"]:
name = s["serviceName"]
if not is_match(name):
infra_type = name_to_infra_type(name)

if not infra_type:
logging.info("Skipping endpoint %s; it does not match any infra type.",
name)
unmatched.append(name)
continue

logging.info("Endpoint %s categorized as %s", name, infra_type)

all_rollouts = rollouts.list(serviceName=name).execute()
is_expired = False
max_age = MAX_LIFETIME[infra_type]
if not all_rollouts.get("rollouts", []):
logging.info("Service %s has no rollouts", name)
is_expired = True
Expand All @@ -143,7 +163,7 @@ def cleanup_endpoints(args):
now = datetime.datetime.now(create_time.tzinfo)

age = now - create_time
if age > datetime.timedelta(hours=args.max_age_hours):
if age > max_age:
is_expired = True

if is_expired:
Expand Down Expand Up @@ -187,12 +207,20 @@ def cleanup_disks(args):
break
for d in results["items"]:
name = d["name"]
if not is_match_disk(name):

infra_type = name_to_infra_type(name)

if not infra_type:
logging.info("Skipping disk %s; it does not match any infra type.",
name)
unmatched.append(name)
continue

logging.info("Disk %s categorized as %s", name, infra_type)

max_age = MAX_LIFETIME[infra_type]
age = getAge(d["creationTimestamp"])
if age > datetime.timedelta(hours=args.max_age_hours):
if age > max_age:
logging.info("Deleting disk: %s, age = %r", name, age)
if not args.dryrun:
response = disks.delete(project=args.project, zone=zone,
Expand Down Expand Up @@ -231,21 +259,22 @@ def cleanup_firewall_rules(args):
for d in results["items"]:
name = d["name"]

match = False
if is_match(name, patterns=MATCHING_FIREWALL_RULES):
match = True
infra_type = name_to_infra_type(name)

for tag in d.get("targetTags", []):
if is_match(tag, patterns=MATCHING_FIREWALL_RULES):
match = True
break
tag_infra_type = name_to_infra_type(tag)
if tag_infra_type:
infra_type = tag_infra_type

if not match:
if not infra_type:
unmatched.append(name)
continue

logging.info("Firewall rule %s classified as infra type %s", name,
infra_type)
max_age = MAX_LIFETIME[infra_type]
age = getAge(d["creationTimestamp"])
if age > datetime.timedelta(hours=args.max_age_hours):
if age > max_age:
logging.info("Deleting firewall: %s, age = %r", name, age)
if not args.dryrun:
response = firewalls.delete(project=args.project,
Expand Down Expand Up @@ -274,6 +303,9 @@ def cleanup_instance_groups(args):
unexpired = []
in_use = []

# TODO(jlewi): We should check whether the instance group is in use
# before deleting it. At least in pantheon it looks like instance groups
# are listed as in use by clusters.
for zone in args.zones.split(","): # pylint: disable=too-many-nested-blocks
while True:
results = instanceGroups.list(project=args.project,
Expand All @@ -284,8 +316,17 @@ def cleanup_instance_groups(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(
hours=args.max_ci_deployment_resource_age_hours):

infra_type = name_to_infra_type(name)

if not infra_type:
logging.info("Skipping intance group %s; it does not match any "
"infra type.", name)
continue

logging.info("Instance group %s categorized as %s", name, infra_type)

if age > MAX_LIFETIME[infra_type]:
logging.info("Deleting instanceGroups: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -328,8 +369,7 @@ def cleanup_url_maps(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(
hours=args.max_ci_deployment_resource_age_hours):
if age > MAX_LIFETIME[E2E_OWNERLESS]:
logging.info("Deleting urlMaps: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -372,7 +412,7 @@ def cleanup_target_https_proxies(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(hours=args.max_proxy_age_hours):
if age > MAX_LIFETIME[E2E_OWNERLESS]:
logging.info("Deleting urlMaps: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -417,8 +457,7 @@ def cleanup_target_http_proxies(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(
hours=args.max_ci_deployment_resource_age_hours):
if age > MAX_LIFETIME[E2E_OWNERLESS]:
logging.info("Deleting urlMaps: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -462,7 +501,7 @@ def cleanup_forwarding_rules(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(hours=args.max_proxy_age_hours):
if age > MAX_LIFETIME[E2E_OWNERLESS]:
logging.info("Deleting forwarding rule: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -506,8 +545,7 @@ def cleanup_backend_services(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(
hours=args.max_ci_deployment_resource_age_hours):
if age > MAX_LIFETIME[E2E_OWNERLESS]:
logging.info("Deleting backend services: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -623,14 +661,18 @@ def cleanup_certificates(args):

name = d["name"]
domain = get_ssl_certificate_domain(d)
# Expire e2e certs after 4 hours
if domain.startswith("kfct"):
max_age = datetime.timedelta(hours=4)
else:
# For autodeployments delete after seven days
max_age = datetime.timedelta(days=7)

if age > max_age:
infra_type = name_to_infra_type(domain)

if not infra_type:
logging.info("Skipping certificate named %s for domain %s; "
"it does not match any infra type.", name, domain)
continue

logging.info("Certificate named %s for domain %s categorized as %s",
name, domain, infra_type)

if age > MAX_LIFETIME[infra_type]:
logging.info("Deleting certifcate: %s for domain %s", d["name"], domain)
is_expired = True
if not args.dryrun:
Expand Down Expand Up @@ -679,13 +721,16 @@ def cleanup_service_accounts(args):
# Service accounts don't specify the creation date time. So we
# use the creation time of the key associated with the account.
for a in accounts:
if not is_match(a["email"]):
logging.info("Skipping key %s; it does not match expected names.",
a["email"])
infra_type = name_to_infra_type(a["email"])

if not infra_type:
logging.info("Skipping service account %s; it does not match any "
"infra type.", a["email"])
unmatched_emails.append(a["email"])
continue

logging.info("Service account %s categorized as %s", a["email"], infra_type)

keys = keys_client.list(name=a["name"]).execute()

is_expired = True
Expand All @@ -694,7 +739,7 @@ def cleanup_service_accounts(args):
now = datetime.datetime.now(valid_time.tzinfo)

age = now - valid_time
if age < datetime.timedelta(hours=args.max_age_hours):
if age < MAX_LIFETIME[infra_type]:
is_expired = False
break
if is_expired:
Expand Down Expand Up @@ -824,11 +869,15 @@ def cleanup_deployments(args): # pylint: disable=too-many-statements,too-many-br

name = d["name"]

if not is_match(name):
logging.info("Skipping Deployment %s; it does not match expected names.",
infra_type = name_to_infra_type(name)

if not infra_type:
logging.info("Skipping Deployment %s; it does not match any infra type.",
name)
continue

logging.info("Deployment %s categorized as %s", name, infra_type)

full_insert_time = d.get("insertTime")
age = getAge(full_insert_time)

Expand All @@ -838,14 +887,14 @@ def cleanup_deployments(args): # pylint: disable=too-many-statements,too-many-br
d.get("name"), d.get("operation").get("error"))
max_age = datetime.timedelta(minutes=10)
else:
max_age = datetime.timedelta(hours=args.max_age_hours)
max_age = MAX_LIFETIME[infra_type]

if age < max_age:
unexpired.append(name)
logging.info("Deployment %s has not expired", name)
logging.info("Deployment %s has not expired; max age %s", name, max_age)
continue

logging.info("Deployment %s has expired", name)
logging.info("Deployment %s has expired; max_age %s", name, max_age)
expired.append(name)
logging.info("Deleting deployment %s", name)

Expand Down Expand Up @@ -884,11 +933,16 @@ def cleanup_clusters(args):
continue
for c in clusters["clusters"]:
name = c["name"]
if not is_match(name):
logging.info("Skipping cluster%s; it does not match expected names.",

infra_type = name_to_infra_type(name)

if not infra_type:
logging.info("Skipping cluster %s; it does not match any infra type.",
name)
continue

logging.info("Deployment %s categorized as %s", name, infra_type)

full_insert_time = c["createTime"]
insert_time_str = full_insert_time[:-6]
tz_offset = full_insert_time[-6:]
Expand All @@ -906,8 +960,7 @@ def cleanup_clusters(args):
logging.info("Cluster %s is in error state; %s", c["name"], c.get("statusMessage", ""))
max_age = datetime.timedelta(minutes=10)
else:
max_age = datetime.timedelta(hours=args.max_age_hours)

max_age = MAX_LIFETIME[infra_type]

if age > max_age:
if c.get("status", "") == "STOPPING":
Expand Down Expand Up @@ -1012,19 +1065,7 @@ def main():

parser.add_argument(
"--gc_backend_services", default=False, type=bool,
help=("""Whether to GC backend services that are older
than --max_ci_deployment_resource_age_hours."""))

parser.add_argument(
"--max_ci_deployment_resource_age_hours",
default=24, type=int,
help=("The age of resources in kubeflow-ci-deployment to gc."))

parser.add_argument(
"--max_proxy_age_hours",
default=4, type=int,
help=("""The age of forwarding rules, proxy and SSL certificate in
kubeflow-ci-deployment to gc."""))
help=("""Whether to GC backend services."""))

parser.add_argument(
"--max_wf_age_hours", default=7*24, type=int,
Expand Down Expand Up @@ -1116,9 +1157,13 @@ def main():

parser_clusters.set_defaults(func=cleanup_clusters)


args = parser.parse_args()

# Update max age
MAX_LIFETIME[E2E_INFRA] = datetime.timedelta(hours=args.max_age_hours)

logging.info("Max lifetime:\n%s", MAX_LIFETIME)

util.maybe_activate_service_account()

args.func(args)
Expand Down
Loading