Skip to content

Commit

Permalink
feat: publish certificate-changed event
Browse files Browse the repository at this point in the history
Notify applications when the TLS certificate(s) for some names has
changed.
  • Loading branch information
DavidePrincipi committed Feb 20, 2025
1 parent 725fa09 commit f1a14af
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions imageroot/bin/acmejson-notify
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python3

#
# Copyright (C) 2025 Nethesis S.r.l.
# SPDX-License-Identifier: GPL-3.0-or-later
#

import os
import json
import sys
import agent
import subprocess
import argparse

def main():
argp = argparse.ArgumentParser(
description="Notify applications when a TLS certificate is obtained/renewed via ACME protocol. " +
"If one or more certificates were obtained a \"certificate-update\" event is notified.",
)
argp.add_argument('path', metavar='ACMEJSON-PATH', help="Path to Traefik acme.json file, e.g. acme/acme.json")
args = argp.parse_args()
curr_acme_path = args.path
prev_acme_path = curr_acme_path + '.acmejson-notify'
# Calculate differences between the new acme.json file and its previous version:
changed_certs = parse_acme_certificates(curr_acme_path) - parse_acme_certificates(prev_acme_path)
changed_dns_names = set()
for cert in changed_certs:
changed_dns_names.update(set(cert[0]))
if changed_dns_names:
print("The TLS certificate was changed for the following names:", ', '.join(changed_dns_names))
notify_certificate_changed_event(changed_dns_names)
subprocess.run(['cp', '-pv', curr_acme_path, prev_acme_path])

def parse_acme_certificates(acme_path : str) -> set:
"""Return a set of certificates extractd from acme.json file at
acme_path, represented by tuples of:
- DNS names (tuple of strings),
- certificate (base64 string),
"""
try:
with open(acme_path) as jf:
jacme = json.load(jf)
jcerts = jacme["acmeServer"]["Certificates"]
except (FileNotFoundError, json.JSONDecodeError, KeyError) as ex:
print(f"File {acme_path} is missing, empty or not initialized:", ex, file=sys.stderr)
return set()
certs = set()
for jcert in jcerts:
certs.add((
(jcert['domain']['main'],) + tuple(jcert['domain'].get('sans', [])),
jcert['certificate'],
))
return certs

def notify_certificate_changed_event(changed_dns_names):
rdb = agent.redis_connect(privileged=True)
rdb.publish(f"{os.environ['AGENT_ID']}/event/certificate-changed", json.dumps({
"node_id": int(os.environ['NODE_ID']),
"names": list(changed_dns_names),
}))

if __name__ == "__main__":
main()

0 comments on commit f1a14af

Please sign in to comment.