Skip to content

Commit

Permalink
cleanup_ci needs to use different expiration times for auto deployed …
Browse files Browse the repository at this point in the history
…and E2E clusters

* The auto deployed clusters are now using unique names rather than being
  recycled and we rely on cleanup_ci.py to GC old auto-deployments (#444)

* To support this we need to use variable expiration times.

  * Deployments created by tests should expire as soon as the test is done (so 1-2 hours)

  * But auto-deployed clusters need to live longer

    * There are only refreshed periodically by a cron job (~8 hours) we don't
      want to delete the cluster before a new one is deployed because we need
      a cluster for the example tests

    * We want to leave the clusters up as long as we can to facilitate debugging
      by people working on example tests.

Related to #444
  • Loading branch information
Jeremy Lewi committed Nov 1, 2019
1 parent fd24db8 commit 156497b
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 82 deletions.
204 changes: 125 additions & 79 deletions py/kubeflow/testing/cleanup_ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,47 @@
# See https://github.com/kubeflow/testing/issues/444
# We are switching to unique names for auto deployments
# So this matches the new ones.
AUTO_DEPLOY_PATTERN = re.compile(r"kf-vmaster-(?!n\d\d)")

# Regexes that select matching deployments
MATCHING = [re.compile("e2e-.*"), re.compile("kfctl.*"),
re.compile("z-.*"), re.compile(".*presubmit.*"),
AUTO_DEPLOY_PATTERN]

MATCHING_FIREWALL_RULES = [re.compile("gke-kfctl-.*"),
re.compile("gke-e2e-.*"),
re.compile(".*presubmit.*"),
re.compile(".*postsubmit.*"),
AUTO_DEPLOY_PATTERN]

# Regexes that select matching disks
MATCHING_DISK = [re.compile(".*jlewi.*"), re.compile(".*kfctl.*"),
re.compile(".*postsubmit.*"), re.compile(".*presubmit.*"),
AUTO_DEPLOY_PATTERN]

def is_match_disk(name):
for m in MATCHING_DISK:
if m.match(name):
return True

return False
AUTO_DEPLOY_PATTERNS = [re.compile(r"kf-vmaster-(?!n\d\d)")]

E2E_PATTERNS = [re.compile(".*e2e-.*"), re.compile(".*kfctl.*"),
re.compile(".*z-.*"), re.compile(".*presubmit.*")]

# Constants enumerating the different classes of infra
# We currently have 2 types
# Deployments created by E2E tests
# Auto deployments from master and release branches
AUTO_INFRA = "auto-deploy"
E2E_INFRA = "e2e"

# We use E2E OWNERLESS for resources that we currently don't know how
# to map back to a particular kubeflow deployment.
# In this case we can't determine whether its an E2E deployment or auto-deployed
# resource. So we we just associae a long max age with this.
E2E_OWNERLESS = "e2e-ownerless"

# Map the different classes of infra to max lifetimes.
MAX_LIFETIME = {
# Auto infra should live for 8-24 hours to facilitate debugging of examples
AUTO_INFRA: datetime.timedelta(days=2),
# E2E infra should be relatively short lived
E2E_INFRA: datetime.timedelta(hours=3),
# This should be larger than Auto_infra
E2E_OWNERLESS: datetime.timedelta(days=2),
}

def name_to_infra_type(name):
"""Given a name map it to the type of infrastructure."""

if is_match(name, patterns=AUTO_DEPLOY_PATTERNS):
return AUTO_INFRA

if is_match(name, patterns=E2E_PATTERNS):
return E2E_INFRA
return None

def is_match(name, patterns=None):
if not patterns:
patterns = MATCHING
patterns = E2E_PATTERNS
for m in patterns:
if m.match(name):
return True
Expand Down Expand Up @@ -127,12 +140,19 @@ def cleanup_endpoints(args):

for s in results["services"]:
name = s["serviceName"]
if not is_match(name):
infra_type = name_to_infra_type(name)

if not infra_type:
logging.info("Skipping endpoint %s; it does not match any infra type.",
name)
unmatched.append(name)
continue

logging.info("Endpoint %s categorized as %s", name, infra_type)

all_rollouts = rollouts.list(serviceName=name).execute()
is_expired = False
max_age = MAX_LIFETIME[infra_type]
if not all_rollouts.get("rollouts", []):
logging.info("Service %s has no rollouts", name)
is_expired = True
Expand All @@ -143,7 +163,7 @@ def cleanup_endpoints(args):
now = datetime.datetime.now(create_time.tzinfo)

age = now - create_time
if age > datetime.timedelta(hours=args.max_age_hours):
if age > max_age:
is_expired = True

if is_expired:
Expand Down Expand Up @@ -187,12 +207,20 @@ def cleanup_disks(args):
break
for d in results["items"]:
name = d["name"]
if not is_match_disk(name):

infra_type = name_to_infra_type(name)

if not infra_type:
logging.info("Skipping disk %s; it does not match any infra type.",
name)
unmatched.append(name)
continue

logging.info("Disk %s categorized as %s", name, infra_type)

max_age = MAX_LIFETIME[infra_type]
age = getAge(d["creationTimestamp"])
if age > datetime.timedelta(hours=args.max_age_hours):
if age > max_age:
logging.info("Deleting disk: %s, age = %r", name, age)
if not args.dryrun:
response = disks.delete(project=args.project, zone=zone,
Expand Down Expand Up @@ -231,21 +259,22 @@ def cleanup_firewall_rules(args):
for d in results["items"]:
name = d["name"]

match = False
if is_match(name, patterns=MATCHING_FIREWALL_RULES):
match = True
infra_type = name_to_infra_type(name)

for tag in d.get("targetTags", []):
if is_match(tag, patterns=MATCHING_FIREWALL_RULES):
match = True
break
tag_infra_type = name_to_infra_type(tag)
if tag_infra_type:
infra_type = tag_infra_type

if not match:
if not infra_type:
unmatched.append(name)
continue

logging.info("Firewall rule %s classified as infra type %s", name,
infra_type)
max_age = MAX_LIFETIME[infra_type]
age = getAge(d["creationTimestamp"])
if age > datetime.timedelta(hours=args.max_age_hours):
if age > max_age:
logging.info("Deleting firewall: %s, age = %r", name, age)
if not args.dryrun:
response = firewalls.delete(project=args.project,
Expand Down Expand Up @@ -274,6 +303,9 @@ def cleanup_instance_groups(args):
unexpired = []
in_use = []

# TODO(jlewi): We should check whether the instance group is in use
# before deleting it. At least in pantheon it looks like instance groups
# are listed as in use by clusters.
for zone in args.zones.split(","): # pylint: disable=too-many-nested-blocks
while True:
results = instanceGroups.list(project=args.project,
Expand All @@ -284,8 +316,17 @@ def cleanup_instance_groups(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(
hours=args.max_ci_deployment_resource_age_hours):

infra_type = name_to_infra_type(name)

if not infra_type:
logging.info("Skipping intance group %s; it does not match any "
"infra type.", name)
continue

logging.info("Instance group %s categorized as %s", name, infra_type)

if age > MAX_LIFETIME[infra_type]:
logging.info("Deleting instanceGroups: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -328,8 +369,7 @@ def cleanup_url_maps(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(
hours=args.max_ci_deployment_resource_age_hours):
if age > MAX_LIFETIME[E2E_OWNERLESS]:
logging.info("Deleting urlMaps: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -372,7 +412,7 @@ def cleanup_target_https_proxies(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(hours=args.max_proxy_age_hours):
if age > MAX_LIFETIME[E2E_OWNERLESS]:
logging.info("Deleting urlMaps: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -417,8 +457,7 @@ def cleanup_target_http_proxies(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(
hours=args.max_ci_deployment_resource_age_hours):
if age > MAX_LIFETIME[E2E_OWNERLESS]:
logging.info("Deleting urlMaps: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -462,7 +501,7 @@ def cleanup_forwarding_rules(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(hours=args.max_proxy_age_hours):
if age > MAX_LIFETIME[E2E_OWNERLESS]:
logging.info("Deleting forwarding rule: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -506,8 +545,7 @@ def cleanup_backend_services(args):
for s in results["items"]:
name = s["name"]
age = getAge(s["creationTimestamp"])
if age > datetime.timedelta(
hours=args.max_ci_deployment_resource_age_hours):
if age > MAX_LIFETIME[E2E_OWNERLESS]:
logging.info("Deleting backend services: %s, age = %r", name, age)
if not args.dryrun:
try:
Expand Down Expand Up @@ -623,14 +661,18 @@ def cleanup_certificates(args):

name = d["name"]
domain = get_ssl_certificate_domain(d)
# Expire e2e certs after 4 hours
if domain.startswith("kfct"):
max_age = datetime.timedelta(hours=4)
else:
# For autodeployments delete after seven days
max_age = datetime.timedelta(days=7)

if age > max_age:
infra_type = name_to_infra_type(domain)

if not infra_type:
logging.info("Skipping certificate named %s for domain %s; "
"it does not match any infra type.", name, domain)
continue

logging.info("Certificate named %s for domain %s categorized as %s",
name, domain, infra_type)

if age > MAX_LIFETIME[infra_type]:
logging.info("Deleting certifcate: %s for domain %s", d["name"], domain)
is_expired = True
if not args.dryrun:
Expand Down Expand Up @@ -679,13 +721,16 @@ def cleanup_service_accounts(args):
# Service accounts don't specify the creation date time. So we
# use the creation time of the key associated with the account.
for a in accounts:
if not is_match(a["email"]):
logging.info("Skipping key %s; it does not match expected names.",
a["email"])
infra_type = name_to_infra_type(a["email"])

if not infra_type:
logging.info("Skipping service account %s; it does not match any "
"infra type.", a["email"])
unmatched_emails.append(a["email"])
continue

logging.info("Service account %s categorized as %s", a["email"], infra_type)

keys = keys_client.list(name=a["name"]).execute()

is_expired = True
Expand All @@ -694,7 +739,7 @@ def cleanup_service_accounts(args):
now = datetime.datetime.now(valid_time.tzinfo)

age = now - valid_time
if age < datetime.timedelta(hours=args.max_age_hours):
if age < MAX_LIFETIME[a["email"]]:
is_expired = False
break
if is_expired:
Expand Down Expand Up @@ -824,11 +869,15 @@ def cleanup_deployments(args): # pylint: disable=too-many-statements,too-many-br

name = d["name"]

if not is_match(name):
logging.info("Skipping Deployment %s; it does not match expected names.",
infra_type = name_to_infra_type(name)

if not infra_type:
logging.info("Skipping Deployment %s; it does not match any infra type.",
name)
continue

logging.info("Deployment %s categorized as %s", name, infra_type)

full_insert_time = d.get("insertTime")
age = getAge(full_insert_time)

Expand All @@ -838,14 +887,14 @@ def cleanup_deployments(args): # pylint: disable=too-many-statements,too-many-br
d.get("name"), d.get("operation").get("error"))
max_age = datetime.timedelta(minutes=10)
else:
max_age = datetime.timedelta(hours=args.max_age_hours)
max_age = MAX_LIFETIME[infra_type]

if age < max_age:
unexpired.append(name)
logging.info("Deployment %s has not expired", name)
logging.info("Deployment %s has not expired; max age %s", name, max_age)
continue

logging.info("Deployment %s has expired", name)
logging.info("Deployment %s has expired; max_age %s", name, max_age)
expired.append(name)
logging.info("Deleting deployment %s", name)

Expand Down Expand Up @@ -884,11 +933,16 @@ def cleanup_clusters(args):
continue
for c in clusters["clusters"]:
name = c["name"]
if not is_match(name):
logging.info("Skipping cluster%s; it does not match expected names.",

infra_type = name_to_infra_type(name)

if not infra_type:
logging.info("Skipping cluster %s; it does not match any infra type.",
name)
continue

logging.info("Deployment %s categorized as %s", name, infra_type)

full_insert_time = c["createTime"]
insert_time_str = full_insert_time[:-6]
tz_offset = full_insert_time[-6:]
Expand All @@ -906,8 +960,7 @@ def cleanup_clusters(args):
logging.info("Cluster %s is in error state; %s", c["name"], c.get("statusMessage", ""))
max_age = datetime.timedelta(minutes=10)
else:
max_age = datetime.timedelta(hours=args.max_age_hours)

max_age = MAX_LIFETIME[infra_type]

if age > max_age:
if c.get("status", "") == "STOPPING":
Expand Down Expand Up @@ -1015,17 +1068,6 @@ def main():
help=("""Whether to GC backend services that are older
than --max_ci_deployment_resource_age_hours."""))

parser.add_argument(
"--max_ci_deployment_resource_age_hours",
default=24, type=int,
help=("The age of resources in kubeflow-ci-deployment to gc."))

parser.add_argument(
"--max_proxy_age_hours",
default=4, type=int,
help=("""The age of forwarding rules, proxy and SSL certificate in
kubeflow-ci-deployment to gc."""))

parser.add_argument(
"--max_wf_age_hours", default=7*24, type=int,
help=("How long to wait before garbage collecting Argo workflows."))
Expand Down Expand Up @@ -1116,9 +1158,13 @@ def main():

parser_clusters.set_defaults(func=cleanup_clusters)


args = parser.parse_args()

# Update max age
MAX_LIFETIME[E2E_INFRA] = datetime.timedelta(hours=args.max_age_hours)

logging.info("Max lifetime:\n%s", MAX_LIFETIME)

util.maybe_activate_service_account()

args.func(args)
Expand Down
Loading

0 comments on commit 156497b

Please sign in to comment.