diff --git a/imageroot/actions/delete-certificate/20writeconfig b/imageroot/actions/delete-certificate/20writeconfig index 5af1f82..eba52cf 100755 --- a/imageroot/actions/delete-certificate/20writeconfig +++ b/imageroot/actions/delete-certificate/20writeconfig @@ -1,44 +1,27 @@ #!/usr/bin/env python3 # -# Copyright (C) 2023 Nethesis S.r.l. +# Copyright (C) 2025 Nethesis S.r.l. # SPDX-License-Identifier: GPL-3.0-or-later # -# -# Delete a Let's Encrypt certificate -# Input example: -# -# {"fqdn": "example.com"} -# - import json import sys import os -import agent - -from custom_certificate_manager import delete_custom_certificate, list_custom_certificates - -# Try to parse the stdin as JSON. -# If parsing fails, output everything to stderr -data = json.load(sys.stdin) - -agent_id = os.getenv("AGENT_ID", "") -if not agent_id: - raise Exception("AGENT_ID not found inside the environemnt") - -# Try to delete uploaded certificate -custom_certificate = False -for cert in list_custom_certificates(): - if cert.get('fqdn') == data['fqdn']: - delete_custom_certificate(data['fqdn']) - custom_certificate = True - -# Try to delete the route for obtained certificate -if not custom_certificate: - cert_path = f'configs/certificate-{data["fqdn"]}.yml' - if os.path.isfile(cert_path): - os.unlink(cert_path) - -# Output valid JSON -print("true") +import cert_helpers + +def main(): + request = json.load(sys.stdin) + fqdn = request['fqdn'] + if fqdn in cert_helpers.read_custom_cert_names(): + cert_helpers.remove_custom_cert(fqdn) + elif fqdn in cert_helpers.read_default_cert_names(): + cert_helpers.remove_default_certificate_name(fqdn) + else: + agent.set_status('validation-failed') + json.dump([{'field': 'fqdn','parameter':'fqdn','value': fqdn,'error':'certificate_not_found'}], fp=sys.stdout) + sys.exit(2) + json.dump(True, fp=sys.stdout) + +if __name__ == "__main__": + main() diff --git a/imageroot/actions/delete-certificate/21waitsync b/imageroot/actions/delete-certificate/21waitsync index 196d331..7615ad3 100755 --- a/imageroot/actions/delete-certificate/21waitsync +++ b/imageroot/actions/delete-certificate/21waitsync @@ -1,18 +1,9 @@ -#!/usr/bin/env python3 +#!/bin/bash # -# Copyright (C) 2023 Nethesis S.r.l. +# Copyright (C) 2025 Nethesis S.r.l. # SPDX-License-Identifier: GPL-3.0-or-later # -import json -import sys -import time -from get_certificate import get_certificate - -data = json.load(sys.stdin) -retry = 0 - -while get_certificate(data).get('fqdn') == data['fqdn'] and retry <= 10: - retry += 1 - time.sleep(1) +# Placeholder, see bug NethServer/dev#7058 +exit 0 diff --git a/imageroot/actions/get-certificate/20readconfig b/imageroot/actions/get-certificate/20readconfig index eac1d8b..4f6ef67 100755 --- a/imageroot/actions/get-certificate/20readconfig +++ b/imageroot/actions/get-certificate/20readconfig @@ -1,23 +1,20 @@ #!/usr/bin/env python3 # -# Copyright (C) 2023 Nethesis S.r.l. +# Copyright (C) 2025 Nethesis S.r.l. # SPDX-License-Identifier: GPL-3.0-or-later # -import json import sys +import os +import json +import agent +import cert_helpers -from custom_certificate_manager import info_custom_certificate -from get_certificate import get_certificate - -# Try to parse the stdin as JSON. -# If parsing fails, output everything to stderr - -data = json.load(sys.stdin) -try: - cert_info = info_custom_certificate(data['fqdn']) -except FileNotFoundError: - cert_info = get_certificate(data) +def main(): + request = json.load(sys.stdin) + cert_info = {} + json.dump(cert_info, fp=sys.stdout) -json.dump(cert_info, fp=sys.stdout) +if __name__ == "__main__": + main() diff --git a/imageroot/actions/get-certificate/validate-output.json b/imageroot/actions/get-certificate/validate-output.json index 91a282e..31fd04f 100644 --- a/imageroot/actions/get-certificate/validate-output.json +++ b/imageroot/actions/get-certificate/validate-output.json @@ -6,7 +6,7 @@ "examples": [ { "fqdn": "example.com", - "obtained": "true", + "obtained": true, "type": "internal" } ], diff --git a/imageroot/actions/list-certificates/20readconfig b/imageroot/actions/list-certificates/20readconfig index 4d3e73c..141b917 100755 --- a/imageroot/actions/list-certificates/20readconfig +++ b/imageroot/actions/list-certificates/20readconfig @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -# Copyright (C) 2023 Nethesis S.r.l. +# Copyright (C) 2025 Nethesis S.r.l. # SPDX-License-Identifier: GPL-3.0-or-later # @@ -9,42 +9,36 @@ import json import os import agent import sys -import urllib.request - -from custom_certificate_manager import list_custom_certificates -from get_certificate import get_certificate - - -api_path = os.environ["API_PATH"] - -data = json.load(sys.stdin) - -# Get the list of routers keys -try: - with urllib.request.urlopen(f'http://127.0.0.1/{api_path}/api/http/routers') as res: - traefik_routes = json.load(res) -except urllib.error.URLError as e: - raise Exception(f'Error reaching traefik daemon: {e.reason}') from e -certificates= [] - -# list routes and retrieve either main for a simple list -# or name to use it inside the traefik API and list following type and valid acme cert -for route in traefik_routes: - if "certResolver" in route.get("tls", {}) and route['status'] == 'enabled': - domains = route["tls"]["domains"] - if data != None and data.get('expand_list'): - # we do not use fqdn, we use name : certificate-sub.domain.com@file or nextcloud1-https@file - certificates.append(get_certificate({'name': route['name']})) - else: - certificates.append(domains[0]["main"]) - -# Retrieve custom certificate -if data != None and data.get('expand_list'): - certificates = certificates + list_custom_certificates() -else: - certificates_custom = [] - for item in list_custom_certificates(): - certificates_custom.append(item["fqdn"]) - certificates = certificates + certificates_custom - -json.dump(certificates, fp=sys.stdout) +import cert_helpers + + +def list_certificates_brief(): + return cert_helpers.read_default_cert_names() + cert_helpers.read_custom_cert_names() + +def list_certificates_detailed(): + response = [] + for acmename in cert_helpers.read_default_cert_names(): + response.append({ + "fqdn": acmename, + "type": "internal", + "obtained": cert_helpers.has_acmejson_name(acmename), + }) + for certsubject in cert_helpers.read_custom_cert_names(): + response.append({ + "fqdn": certsubject, + "type": "custom", + "obtained": True, + }) + response.sort(key=lambda item: (item["type"], item["fqdn"])) + return response + +def main(): + request = json.load(sys.stdin) + if request is None or request["expand_list"] is False: + response = list_certificates_brief() + else: + response = list_certificates_detailed() + json.dump(response, fp=sys.stdout) + +if __name__ == "__main__": + main() diff --git a/imageroot/actions/set-certificate/20writeconfig b/imageroot/actions/set-certificate/20writeconfig index 44217d9..3a81248 100755 --- a/imageroot/actions/set-certificate/20writeconfig +++ b/imageroot/actions/set-certificate/20writeconfig @@ -1,41 +1,25 @@ #!/usr/bin/env python3 # -# Copyright (C) 2023 Nethesis S.r.l. +# Copyright (C) 2025 Nethesis S.r.l. # SPDX-License-Identifier: GPL-3.0-or-later # -# -# Request a let's encrypt certificate -# Input example: -# {"fqdn": "example.com"} -# - import json import sys import os -import uuid -import yaml - -# Try to parse the stdin as JSON. -# If parsing fails, output everything to stderr -data = json.load(sys.stdin) - -agent_id = os.getenv("AGENT_ID", "") -if not agent_id: - raise Exception("AGENT_ID not found inside the environemnt") +import cert_helpers -# Setup HTTPS router -path = uuid.uuid4() -router = { - 'entryPoints': ["https"], - 'service': "ping@internal", - 'rule' : f'Host(`{data["fqdn"]}`) && Path(`/{path}`)', - 'priority': '1', - 'tls': { 'domains': [{'main': data["fqdn"]}], 'certresolver': "acmeServer"} - } +def main(): + request = json.load(sys.stdin) + cert_helpers.add_default_certificate_name(request['fqdn']) + if request.get('sync'): + obtained = cert_helpers.wait_acmejson_sync(timeout=request.get('sync_timeout', 120)) + else: + obtained = False + json.dump({"obtained": obtained}, fp=sys.stdout) + if request.get('sync') is not None and obtained is False: + exit(2) -# Write configuration file -config = {"http": {"routers": {f'certificate-{data["fqdn"]}': router}}} -with open(f'configs/certificate-{data["fqdn"]}.yml', 'w') as fp: - fp.write(yaml.safe_dump(config, default_flow_style=False, sort_keys=False, allow_unicode=True)) +if __name__ == "__main__": + main() diff --git a/imageroot/actions/set-certificate/21waitsync b/imageroot/actions/set-certificate/21waitsync old mode 100755 new mode 100644 index 386d401..7615ad3 --- a/imageroot/actions/set-certificate/21waitsync +++ b/imageroot/actions/set-certificate/21waitsync @@ -1,37 +1,9 @@ -#!/usr/bin/env python3 +#!/bin/bash # -# Copyright (C) 2023 Nethesis S.r.l. +# Copyright (C) 2025 Nethesis S.r.l. # SPDX-License-Identifier: GPL-3.0-or-later # -import json -import sys -import time -import agent -from get_certificate import get_certificate - -data = json.load(sys.stdin) -retry = 0 -certificate = {} - -sync_timeout = data['sync_timeout'] if data.get('sync_timeout') is not None else 120 - -while get_certificate(data).get('fqdn') != data['fqdn'] and retry <= 10: - retry += 1 - time.sleep(1) - -certificate['obtained'] = get_certificate(data).get('obtained') - -if certificate['obtained'] is False and data.get('sync') is not None and data['sync'] is True: - retry = 0 - while certificate['obtained'] != True and retry < sync_timeout: - agent.set_progress(round((retry*100)/sync_timeout)) - certificate['obtained'] = get_certificate(data).get('obtained') - retry += 1 - time.sleep(1) - -json.dump(certificate, fp=sys.stdout) - -if data.get('sync') is not None and certificate['obtained'] is False: - exit(2) +# Placeholder, see bug NethServer/dev#7058 +exit 0 diff --git a/imageroot/pypkg/custom_certificate_manager.py b/imageroot/pypkg/custom_certificate_manager.py deleted file mode 100644 index 92cef6a..0000000 --- a/imageroot/pypkg/custom_certificate_manager.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env python3 - -# -# Copyright (C) 2023 Nethesis S.r.l. -# SPDX-License-Identifier: GPL-3.0-or-later -# - - -import agent -import os -from pathlib import Path - -CUSTOM_CERTIFICATES_DIR = 'custom_certificates' -CUSTOM_TRAEFIK_CONFIG_DIR = 'configs' -CRT_SUFFIX = '.crt' -KEY_SUFFIX = '.key' - - -def info_custom_certificate(fqdn): - """Get info for custom certificate - - :param fqdn: The FQDN to find in the certificates - :type fqdn: str - :return: dictionary containing custom certificate info - :rtype: dict - :raises FileNotFoundError: if no certificate is found for the given FQDN - """ - custom_cert_path = Path(CUSTOM_CERTIFICATES_DIR) - for item in custom_cert_path.iterdir(): - if item.is_file() and item.name == fqdn + CRT_SUFFIX: - return { - 'fqdn': fqdn, - 'type': 'custom', - 'obtained': True - } - - raise FileNotFoundError(f'Can\'t find custom certificate for {fqdn}.') - - -def list_custom_certificates(): - """Returns a list of custom certificates uploaded - - :return: list containing data for each custom_certificate found, see info_custom_certificate for more info - :rtype: list - """ - custom_certificates = [] - custom_cert_path = Path(CUSTOM_CERTIFICATES_DIR) - for item in custom_cert_path.iterdir(): - if item.is_file() and item.suffix == CRT_SUFFIX: - custom_certificates.append(info_custom_certificate(item.name.removesuffix(CRT_SUFFIX))) - - return custom_certificates - - -def delete_custom_certificate(fqdn): - """Delete custom certificate from system and Traefik configuration - - :param fqdn: FQDN to delete from filesystem - :type fqdn: str - :raises FileNotFoundError: if certificate or key is missing - """ - cert_file_path = Path(CUSTOM_CERTIFICATES_DIR + f'/{fqdn}{CRT_SUFFIX}') - key_file_path = Path(CUSTOM_CERTIFICATES_DIR + f'/{fqdn}{KEY_SUFFIX}') - cert_config_path = Path(CUSTOM_TRAEFIK_CONFIG_DIR + f'/certificate_{fqdn}.yml') - # check the presence of both file before removing, ensures that both are deleted, avoiding system inconsistencies - if cert_file_path.is_file() and key_file_path.is_file() and cert_config_path.is_file(): - cert_file_path.unlink() - key_file_path.unlink() - cert_config_path.unlink() - # remove the certificate and key from redis - rdb = agent.redis_connect(privileged=True) - rdb.delete(f'module/{os.environ["MODULE_ID"]}/certificate/{fqdn}') - - else: - raise FileNotFoundError(f'Invalid custom certificate state for {fqdn}.') diff --git a/imageroot/pypkg/get_certificate.py b/imageroot/pypkg/get_certificate.py deleted file mode 100755 index 6dc7a25..0000000 --- a/imageroot/pypkg/get_certificate.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python3 - -# -# Copyright (C) 2023 Nethesis S.r.l. -# SPDX-License-Identifier: GPL-3.0-or-later -# - -import json -import os -import agent -import urllib.request - -def get_certificate(data): - try: - name = data.get('name','') - fqdn = data.get('fqdn','') - certificate = {} - api_path = os.environ["API_PATH"] - moduleid = os.environ["MODULE_ID"] - - # we retrieve route and certificate for list-certificates action - if name != "": - with urllib.request.urlopen(f'http://127.0.0.1/{api_path}/api/http/routers/{name}') as res: - traefik_https_route = json.load(res) - - # we retrieve cert from fqdn for backward compatibility (it is an acme certificate) - elif fqdn != "": - with urllib.request.urlopen(f'http://127.0.0.1/{api_path}/api/http/routers/certificate-{fqdn}@file') as res: - traefik_https_route = json.load(res) - - # Check if the route is ready to use - if traefik_https_route['status'] == 'disabled': - return {} - - certificate['fqdn'] = traefik_https_route['tls']['domains'][0]['main'] - # either from internal or route (type could be also custom cert) - certificate['type'] = 'internal' if traefik_https_route['name'].startswith('certificate-') else 'route' - certificate['obtained'] = False - - # Open the certificates storage file - with open(f'/home/{moduleid}/.local/share/containers/storage/volumes/traefik-acme/_data/acme.json') as f: - acme_storage = json.load(f) - - resolver = traefik_https_route['tls']['certResolver'] - certificates = acme_storage[resolver].get('Certificates') - - # Check if the certificate is present in the storage - for cert in certificates if certificates else []: - if cert['domain']['main'] == certificate['fqdn']: - certificate['obtained'] = True - break - - except urllib.error.HTTPError as e: - if e.code == 404: - # If the certificate is not found, return an empty JSON object - pass - - except urllib.error.URLError as e: - raise Exception(f'Error reaching traefik daemon: {e.reason}') from e - - except json.decoder.JSONDecodeError: - # The acme storage is empty or corrupted, return the certificate as requested but not obtained - pass - - return certificate