diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000000000000000000000000000000000000..45a5a77d1c8b3bea1dc9ba7ac9b46ab614f6dfd8 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +/docker +/.gitlab-ci.yml +/.git diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..683866d2d09ba0d1a6c2beb236e8705e3b533c54 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,114 @@ +stages: + - build + - check + - unit_test + - lets_encrypt_staging + +.build: + stage: build + image: docker:stable + only: + - merge_requests + - master + +.check: + stage: check + image: acme-dns-tiny:buster-slim + only: + - merge_requests + - master + +.unit_test: + stage: unit_test + script: + - python3-coverage run --append --source ./ -m unittest -v + tests.unit_test_acme_dns_tiny + only: + - merge_requests + - master + +.lets_encrypt_staging: + stage: lets_encrypt_staging + script: + - python3-coverage run --append --source ./ -m unittest -v + tests.staging_test_acme_dns_tiny + tests.staging_test_acme_account_rollover + tests.staging_test_acme_account_deactivate + only: + - merge_requests + - master + +jessie-slim: + extends: .build + script: + - docker build -t "acme-dns-tiny:jessie-slim" + -f "docker/jessie/Dockerfile" . + +stretch-slim: + extends: .build + script: + - docker build -t "acme-dns-tiny:stretch-slim" + -f "docker/stretch/Dockerfile" . + +buster-slim: + extends: .build + script: + - docker build -t "acme-dns-tiny:buster-slim" + -f "docker/buster/Dockerfile" . + +compile: + extends: .check + script: + - python3 -m py_compile acme_dns_tiny.py tools/*.py tests/*.py + +lint: + extends: .check + script: + - pylint3 acme_dns_tiny.py + - pylint3 tools/acme_account_deactivate.py + - pylint3 tools/acme_account_rollover.py + - pylint3 tests/config_factory.py + - pylint3 tests/staging_test_acme_dns_tiny.py + - pylint3 tests/unit_test_acme_dns_tiny.py + - pylint3 tests/staging_test_acme_account_deactivate.py + - pylint3 tests/staging_test_acme_account_rollover.py + +pep8: + extends: .check + script: + - pycodestyle --max-line-length=100 --ignore=E401,W503 --exclude=tests . + - pycodestyle --max-line-length=100 --ignore=E722 tests + +jessie-ut: + extends: .unit_test + image: acme-dns-tiny:jessie-slim + +stretch-ut: + extends: .unit_test + image: acme-dns-tiny:stretch-slim + +buster-ut: + extends: .unit_test + image: acme-dns-tiny:buster-slim + artifacts: + paths: + - .coverage + +jessie-le-staging: + extends: .lets_encrypt_staging + image: acme-dns-tiny:jessie-slim + +stretch-le-staging: + extends: .lets_encrypt_staging + image: acme-dns-tiny:stretch-slim + +buster-le-staging: + extends: .lets_encrypt_staging + image: acme-dns-tiny:buster-slim + after_script: + - python3-coverage report + --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_deactivate.py + - python3-coverage html + artifacts: + paths: + - htmlcov diff --git a/LICENSE b/LICENSE index ba546612aae4e576e90da964ced14179ecf270bb..92656081506e9a97fb023491635e7b1b3d706baa 100644 --- a/LICENSE +++ b/LICENSE @@ -1,7 +1,7 @@ The MIT License (MIT) Copyright (c) 2015 Daniel Roesler -Copyright (c) 2016 Adrien Dorsaz +Copyright (c) 2016-2020 Adrien Dorsaz Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/Makefile b/Makefile index 59cf21993cf48be04523dec236ed282925351b61..2a27135f39c562150167ca7dc6d3ccab13b11f29 100644 --- a/Makefile +++ b/Makefile @@ -6,13 +6,13 @@ unit_test_acme_dns_tiny_success_san: python3 -m unittest tests.test_acme_dns_tiny.TestACMEDNSTiny.test_success_san unit_test_acme_account_rollover: - python3 -m unittest tests.test_acme_account_rollover.TestACMEAccountRollover.test_success_account_rollover + python3 -m unittest tests.staging_test_acme_account_rollover.TestACMEAccountRollover.test_success_account_rollover unit_test_acme_account_deactivate: - python3 -m unittest tests.test_acme_account_deactivate.TestACMEAccountDeactivate.test_success_account_deactivate + python3 -m unittest tests.staging_test_acme_account_deactivate.TestACMEAccountDeactivate.test_success_account_deactivate unit_test_all_with_coverage: - python3-coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_deactivate + python3-coverage run --source ./ -m unittest -v tests.unit_test_acme_dns_tiny tests.staging_test_acme_dns_tiny tests.staging_test_acme_account_rollover tests.staging_test_acme_account_deactivate python3-coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_deactivate.py python3-coverage html diff --git a/README.md b/README.md index 12fd1b21a34e39b92bad60a88c54858518d91f57..f23a1afba1ccd51e91ec77d3d2bab8c9439d825a 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ above files. This user should *NOT* have access to your domain key! This project has a very, very limited scope and codebase. The project is happy to receive bug reports and pull requests, but please don't add any new features. -This script must stay under ~250 lines of code to ensure it can be easily +This script must stay under ~400 lines of code to ensure it can be easily audited by anyone who wants to run it. If you want to add features for your own setup to make things easier for you, diff --git a/acme_dns_tiny.py b/acme_dns_tiny.py index 5d405a302db23dc18a79ed9f9b0ee1513c924bd8..f36e378baac4b1c4656a091d3557fa67a568a587 100644 --- a/acme_dns_tiny.py +++ b/acme_dns_tiny.py @@ -1,225 +1,267 @@ #!/usr/bin/env python3 -import argparse, subprocess, requests, json, sys, base64, binascii, time, hashlib, re, copy, logging, configparser -import dns.resolver, dns.tsigkeyring, dns.update +# pylint: disable=multiple-imports +"""ACME client to met DNS challenge and receive TLS certificate""" +import argparse, base64, binascii, configparser, copy, hashlib, json, logging +import re, sys, subprocess, time +import requests, dns.resolver, dns.tsigkeyring, dns.update LOGGER = logging.getLogger('acme_dns_tiny') LOGGER.addHandler(logging.StreamHandler()) + +def _base64(text): + """Encodes string as base64 as specified in the ACME RFC.""" + return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") + + +def _openssl(command, options, communicate=None): + """Run openssl command line and raise IOError on non-zero return.""" + openssl = subprocess.Popen(["openssl", command] + options, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = openssl.communicate(communicate) + if openssl.returncode != 0: + raise IOError("OpenSSL Error: {0}".format(err)) + return out + + +# pylint: disable=too-many-locals,too-many-branches,too-many-statements def get_crt(config, log=LOGGER): - def _b64(b): - """"Encodes string as base64 as specified in ACME RFC """ - return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=") - - def _openssl(command, options, communicate=None): - """Run openssl command line and raise IOError on non-zero return.""" - openssl = subprocess.Popen(["openssl", command] + options, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out, err = openssl.communicate(communicate) - if openssl.returncode != 0: - raise IOError("OpenSSL Error: {0}".format(err)) - return out + """Get ACME certificate by resolving DNS challenge.""" def _update_dns(rrset, action): """Updates DNS resource by adding or deleting resource.""" algorithm = dns.name.from_text("{0}".format(config["TSIGKeyring"]["Algorithm"].lower())) - dns_update = dns.update.Update(config["DNS"]["zone"], keyring=keyring, keyalgorithm=algorithm) + dns_update = dns.update.Update(config["DNS"]["zone"], + keyring=private_keyring, keyalgorithm=algorithm) if action == "add": dns_update.add(rrset.name, rrset) elif action == "delete": dns_update.delete(rrset.name, rrset) - response = dns.query.tcp(dns_update, config["DNS"]["Host"], port=config.getint("DNS", "Port")) + response = dns.query.tcp(dns_update, config["DNS"]["Host"], + port=config.getint("DNS", "Port")) dns_update = None return response - def _send_signed_request(url, payload): + def _send_signed_request(url, payload, extra_headers=None): """Sends signed requests to ACME server.""" - nonlocal jws_nonce - if payload == "": # on POST-as-GET, final payload has to be just empty string + nonlocal nonce + if payload == "": # on POST-as-GET, final payload has to be just empty string payload64 = "" else: - payload64 = _b64(json.dumps(payload).encode("utf8")) - protected = copy.deepcopy(jws_header) - protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce'] + payload64 = _base64(json.dumps(payload).encode("utf8")) + protected = copy.deepcopy(private_acme_signature) + protected["nonce"] = nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce'] protected["url"] = url if url == acme_config["newAccount"]: - del protected["kid"] + if "kid" in protected: + del protected["kid"] else: del protected["jwk"] - protected64 = _b64(json.dumps(protected).encode("utf8")) + protected64 = _base64(json.dumps(protected).encode("utf8")) signature = _openssl("dgst", ["-sha256", "-sign", config["acmednstiny"]["AccountKeyFile"]], "{0}.{1}".format(protected64, payload64).encode("utf8")) jose = { - "protected": protected64, "payload": payload64,"signature": _b64(signature) + "protected": protected64, "payload": payload64, "signature": _base64(signature) } + joseheaders = {'Content-Type': 'application/jose+json'} + joseheaders.update(adtheaders) + joseheaders.update(extra_headers or {}) try: response = requests.post(url, json=jose, headers=joseheaders) except requests.exceptions.RequestException as error: response = error.response finally: - jws_nonce = response.headers['Replay-Nonce'] - try: - return response, response.json() - except ValueError as error: - return response, json.dumps({}) + nonce = response.headers['Replay-Nonce'] + try: + return response, response.json() + except ValueError: # if body is empty or not JSON formatted + return response, json.dumps({}) # main code - adtheaders = {'User-Agent': 'acme-dns-tiny/2.1', - 'Accept-Language': config["acmednstiny"].get("Language", "en") - } - joseheaders=copy.deepcopy(adtheaders) - joseheaders['Content-Type']='application/jose+json' + adtheaders = {'User-Agent': 'acme-dns-tiny/2.2', + 'Accept-Language': config["acmednstiny"].get("Language", "en")} + nonce = None - log.info("Fetch informations from the ACME directory.") - directory = requests.get(config["acmednstiny"]["ACMEDirectory"], headers=adtheaders) - acme_config = directory.json() - terms_service = acme_config.get("meta", {}).get("termsOfService", "") + log.info("Find domains to validate from the Certificate Signing Request (CSR) file.") + csr = _openssl("req", ["-in", config["acmednstiny"]["CSRFile"], + "-noout", "-text"]).decode("utf8") + domains = set() + common_name = re.search(r"Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)", csr) + if common_name is not None: + domains.add(common_name.group(1)) + subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \r?\n +([^\r\n]+)\r?\n", csr, + re.MULTILINE | re.DOTALL) + if subject_alt_names is not None: + for san in subject_alt_names.group(1).split(", "): + if san.startswith("DNS:"): + domains.add(san[4:]) + if len(domains) == 0: # pylint: disable=len-as-condition + raise ValueError("Didn't find any domain to validate in the provided CSR.") - log.info("Prepare DNS keyring and resolver.") - keyring = dns.tsigkeyring.from_text({config["TSIGKeyring"]["KeyName"]: config["TSIGKeyring"]["KeyValue"]}) + log.info("Configure DNS client tools.") + # That keyring is used to authenticate with the DNS server, it needs to be safely kept + private_keyring = dns.tsigkeyring.from_text({config["TSIGKeyring"]["KeyName"]: + config["TSIGKeyring"]["KeyValue"]}) resolver = dns.resolver.Resolver(configure=False) resolver.retry_servfail = True nameserver = [] try: - nameserver = [ipv4_rrset.to_text() for ipv4_rrset in dns.resolver.query(config["DNS"]["Host"], rdtype="A")] - nameserver = nameserver + [ipv6_rrset.to_text() for ipv6_rrset in dns.resolver.query(config["DNS"]["Host"], rdtype="AAAA")] - except dns.exception.DNSException as e: - log.info("A and/or AAAA DNS resources not found for configured dns host: we will use either resource found if one exists or directly the DNS Host configuration.") + nameserver = [ipv4_rrset.to_text() for ipv4_rrset + in dns.resolver.query(config["DNS"]["Host"], rdtype="A")] + nameserver = nameserver + [ipv6_rrset.to_text() for ipv6_rrset + in dns.resolver.query(config["DNS"]["Host"], rdtype="AAAA")] + except dns.exception.DNSException: + log.info(("A and/or AAAA DNS resources not found for configured dns host: we will use " + "either resource found if one exists or directly the DNS Host configuration.")) if not nameserver: nameserver = [config["DNS"]["Host"]] resolver.nameservers = nameserver - log.info("Read account key.") - accountkey = _openssl("rsa", ["-in", config["acmednstiny"]["AccountKeyFile"], "-noout", "-text"]) + log.info("Get private signature from account key.") + accountkey = _openssl("rsa", ["-in", config["acmednstiny"]["AccountKeyFile"], + "-noout", "-text"]) pub_hex, pub_exp = re.search( r"modulus:\r?\n\s+00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups() pub_exp = "{0:x}".format(int(pub_exp)) pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp - jws_header = { + # That signature is used to authenticate with the ACME server, it needs to be safely kept + private_acme_signature = { "alg": "RS256", "jwk": { - "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), + "e": _base64(binascii.unhexlify(pub_exp.encode("utf-8"))), "kty": "RSA", - "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), + "n": _base64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), }, - "kid": None, } - accountkey_json = json.dumps(jws_header["jwk"], sort_keys=True, separators=(",", ":")) - jwk_thumbprint = _b64(hashlib.sha256(accountkey_json.encode("utf8")).digest()) - jws_nonce = None + private_jwk = json.dumps(private_acme_signature["jwk"], sort_keys=True, separators=(",", ":")) + jwk_thumbprint = _base64(hashlib.sha256(private_jwk.encode("utf8")).digest()) - log.info("Read CSR to find domains to validate.") - csr = _openssl("req", ["-in", config["acmednstiny"]["CSRFile"], "-noout", "-text"]).decode("utf8") - domains = set() - common_name = re.search(r"Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)", csr) - if common_name is not None: - domains.add(common_name.group(1)) - subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \r?\n +([^\r\n]+)\r?\n", csr, re.MULTILINE | re.DOTALL) - if subject_alt_names is not None: - for san in subject_alt_names.group(1).split(", "): - if san.startswith("DNS:"): - domains.add(san[4:]) - if len(domains) == 0: - raise ValueError("Didn't find any domain to validate in the provided CSR.") + log.info("Fetch ACME server configuration from the its directory URL.") + acme_config = requests.get(config["acmednstiny"]["ACMEDirectory"], headers=adtheaders).json() + terms_service = acme_config.get("meta", {}).get("termsOfService", "") - log.info("Register ACME Account.") + log.info("Register ACME Account to get the account identifier.") account_request = {} - if terms_service != "": + if terms_service: account_request["termsOfServiceAgreed"] = True - log.warning("Terms of service exists and will be automatically agreed, please read them: {0}".format(terms_service)) + log.warning(("Terms of service exist and will be automatically agreed if possible, " + "you should read them: %s"), terms_service) account_request["contact"] = config["acmednstiny"].get("Contacts", "").split(';') if account_request["contact"] == [""]: del account_request["contact"] http_response, account_info = _send_signed_request(acme_config["newAccount"], account_request) if http_response.status_code == 201: - jws_header["kid"] = http_response.headers['Location'] - log.info(" - Registered a new account: '{0}'".format(jws_header["kid"])) + private_acme_signature["kid"] = http_response.headers['Location'] + log.info(" - Registered a new account: '%s'", private_acme_signature["kid"]) elif http_response.status_code == 200: - jws_header["kid"] = http_response.headers['Location'] - log.debug(" - Account is already registered: '{0}'".format(jws_header["kid"])) + private_acme_signature["kid"] = http_response.headers['Location'] + log.debug(" - Account is already registered: '%s'", private_acme_signature["kid"]) - http_response, account_info = _send_signed_request(jws_header["kid"], {}) + http_response, account_info = _send_signed_request(private_acme_signature["kid"], {}) else: - raise ValueError("Error registering account: {0} {1}".format(http_response.status_code, account_info)) + raise ValueError("Error registering account: {0} {1}" + .format(http_response.status_code, account_info)) log.info("Update contact information if needed.") - if ("contact" in account_request and set(account_request["contact"]) != set(account_info["contact"])): - http_response, result = _send_signed_request(jws_header["kid"], account_request) + if ("contact" in account_request + and set(account_request["contact"]) != set(account_info["contact"])): + http_response, result = _send_signed_request(private_acme_signature["kid"], account_request) if http_response.status_code == 200: log.debug(" - Account updated with latest contact informations.") else: - raise ValueError("Error registering updates for the account: {0} {1}".format(http_response.status_code, result)) + raise ValueError("Error registering updates for the account: {0} {1}" + .format(http_response.status_code, result)) # new order log.info("Request to the ACME server an order to validate domains.") - new_order = { "identifiers": [{"type": "dns", "value": domain} for domain in domains]} + new_order = {"identifiers": [{"type": "dns", "value": domain} for domain in domains]} http_response, order = _send_signed_request(acme_config["newOrder"], new_order) if http_response.status_code == 201: order_location = http_response.headers['Location'] - log.debug(" - Order received: {0}".format(order_location)) + log.debug(" - Order received: %s", order_location) if order["status"] != "pending" and order["status"] != "ready": - raise ValueError("Order status is neither pending neither ready, we can't use it: {0}".format(order)) + raise ValueError("Order status is neither pending neither ready, we can't use it: {0}" + .format(order)) elif (http_response.status_code == 403 - and order["type"] == "urn:ietf:params:acme:error:userActionRequired"): - raise ValueError("Order creation failed ({0}). Read Terms of Service ({1}), then follow your CA instructions: {2}".format(order["detail"], http_response.headers['Link'], order["instance"])) + and order["type"] == "urn:ietf:params:acme:error:userActionRequired"): + raise ValueError(("Order creation failed ({0}). Read Terms of Service ({1}), then follow " + "your CA instructions: {2}") + .format(order["detail"], http_response.headers['Link'], order["instance"])) else: - raise ValueError("Error getting new Order: {0} {1}".format(http_response.status_code, result)) + raise ValueError("Error getting new Order: {0} {1}" + .format(http_response.status_code, order)) # complete each authorization challenge for authz in order["authorizations"]: if order["status"] == "ready": log.info("No challenge to process: order is already ready.") - break; + break - log.info("Process challenge for authorization: {0}".format(authz)) + log.info("Process challenge for authorization: %s", authz) # get new challenge http_response, authorization = _send_signed_request(authz, "") if http_response.status_code != 200: - raise ValueError("Error fetching challenges: {0} {1}".format(http_response.status_code, authorization)) + raise ValueError("Error fetching challenges: {0} {1}" + .format(http_response.status_code, authorization)) domain = authorization["identifier"]["value"] - log.info("Install DNS TXT resource for domain: {0}".format(domain)) + log.info("Install DNS TXT resource for domain: %s", domain) challenge = [c for c in authorization["challenges"] if c["type"] == "dns-01"][0] token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge["token"]) keyauthorization = "{0}.{1}".format(token, jwk_thumbprint) - keydigest64 = _b64(hashlib.sha256(keyauthorization.encode("utf8")).digest()) + keydigest64 = _base64(hashlib.sha256(keyauthorization.encode("utf8")).digest()) dnsrr_domain = "_acme-challenge.{0}.".format(domain) - try: # a CNAME resource can be used for advanced TSIG configuration - # Note: the CNAME target has to be of "non-CNAME" type to be able to add TXT records aside it - dnsrr_domain = [response.to_text() for response in resolver.query(dnsrr_domain, rdtype="CNAME")][0] - log.info(" - A CNAME resource has been found for this domain, will install TXT on {0}".format(dnsrr_domain)) + try: # a CNAME resource can be used for advanced TSIG configuration + # Note: the CNAME target has to be of "non-CNAME" type (recursion isn't managed) + dnsrr_domain = [response.to_text() for response + in resolver.query(dnsrr_domain, rdtype="CNAME")][0] + log.info(" - A CNAME resource has been found for this domain, will install TXT on %s", + dnsrr_domain) except dns.exception.DNSException as dnsexception: - log.debug(" - Not any CNAME resource has been found for this domain ({1}), will install TXT directly on {0}".format(dnsrr_domain, type(dnsexception).__name__)) - dnsrr_set = dns.rrset.from_text(dnsrr_domain, config["DNS"].getint("TTL"), "IN", "TXT", '"{0}"'.format(keydigest64)) + log.debug((" - Not any CNAME resource has been found for this domain (%s), will " + "install TXT directly on %s"), dnsrr_domain, type(dnsexception).__name__) + dnsrr_set = dns.rrset.from_text(dnsrr_domain, config["DNS"].getint("TTL"), + "IN", "TXT", '"{0}"'.format(keydigest64)) try: _update_dns(dnsrr_set, "add") except dns.exception.DNSException as dnsexception: - raise ValueError("Error updating DNS records: {0} : {1}".format(type(dnsexception).__name__, str(dnsexception))) + raise ValueError("Error updating DNS records: {0} : {1}" + .format(type(dnsexception).__name__, str(dnsexception))) - log.info("Waiting for 1 TTL ({0} seconds) before starting self challenge check.".format(config["DNS"].getint("TTL"))) + log.info("Wait for 1 TTL (%s seconds) to ensure DNS cache is cleared.", + config["DNS"].getint("TTL")) time.sleep(config["DNS"].getint("TTL")) challenge_verified = False number_check_fail = 1 while challenge_verified is False: try: - log.debug('Self test (try: {0}): Check resource with value "{1}" exits on nameservers: {2}'.format(number_check_fail, keydigest64, resolver.nameservers)) + log.debug(('Self test (try: %s): Check resource with value "%s" exits on ' + 'nameservers: %s'), number_check_fail, keydigest64, resolver.nameservers) for response in resolver.query(dnsrr_domain, rdtype="TXT").rrset: - log.debug(" - Found value {0}".format(response.to_text())) - challenge_verified = challenge_verified or response.to_text() == '"{0}"'.format(keydigest64) + log.debug(" - Found value %s", response.to_text()) + challenge_verified = (challenge_verified + or response.to_text() == '"{0}"'.format(keydigest64)) except dns.exception.DNSException as dnsexception: - log.debug(" - Will retry as a DNS error occurred while checking challenge: {0} : {1}".format(type(dnsexception).__name__, dnsexception)) + log.debug( + " - Will retry as a DNS error occurred while checking challenge: %s : %s", + type(dnsexception).__name__, dnsexception) finally: if challenge_verified is False: if number_check_fail >= 10: - raise ValueError("Error checking challenge, value not found: {0}".format(keydigest64)) + raise ValueError("Error checking challenge, value not found: {0}" + .format(keydigest64)) number_check_fail = number_check_fail + 1 time.sleep(config["DNS"].getint("TTL")) log.info("Asking ACME server to validate challenge.") - http_response, result = _send_signed_request(challenge["url"], {"keyAuthorization": keyauthorization}) + http_response, result = _send_signed_request(challenge["url"], + {"keyAuthorization": keyauthorization}) if http_response.status_code != 200: - raise ValueError("Error triggering challenge: {0} {1}".format(http_response.status_code, result)) + raise ValueError("Error triggering challenge: {0} {1}" + .format(http_response.status_code, result)) try: while True: http_response, challenge_status = _send_signed_request(challenge["url"], "") @@ -229,7 +271,7 @@ def get_crt(config, log=LOGGER): if challenge_status["status"] == "pending": time.sleep(2) elif challenge_status["status"] == "valid": - log.info("ACME has verified challenge for domain: {0}".format(domain)) + log.info("ACME has verified challenge for domain: %s", domain) break else: raise ValueError("Challenge for domain {0} did not pass: {1}".format( @@ -238,10 +280,11 @@ def get_crt(config, log=LOGGER): _update_dns(dnsrr_set, "delete") log.info("Request to finalize the order (all chalenge have been completed)") - csr_der = _b64(_openssl("req", ["-in", config["acmednstiny"]["CSRFile"], "-outform", "DER"])) + csr_der = _base64(_openssl("req", ["-in", config["acmednstiny"]["CSRFile"], "-outform", "DER"])) http_response, result = _send_signed_request(order["finalize"], {"csr": csr_der}) if http_response.status_code != 200: - raise ValueError("Error while sending the CSR: {0} {1}".format(http_response.status_code, result)) + raise ValueError("Error while sending the CSR: {0} {1}" + .format(http_response.status_code, result)) while True: http_response, order = _send_signed_request(order_location, "") @@ -258,52 +301,62 @@ def get_crt(config, log=LOGGER): raise ValueError("Finalizing order {0} got errors: {1}".format( domain, order)) - joseheaders['Accept'] = config["acmednstiny"].get("CertificateFormat", 'application/pem-certificate-chain') - http_response, result = _send_signed_request(order["certificate"], "") + http_response, result = _send_signed_request( + order["certificate"], "", + {'Accept': config["acmednstiny"].get("CertificateFormat", + 'application/pem-certificate-chain')}) if http_response.status_code != 200: - raise ValueError("Finalizing order {0} got errors: {1}".format(http_response.status_code, result)) + raise ValueError("Finalizing order {0} got errors: {1}" + .format(http_response.status_code, result)) if 'link' in http_response.headers: - log.info(" - Certificate links given by server: {0}".format(http_response.headers['link'])) + log.info(" - Certificate links given by server: %s", http_response.headers['link']) - log.info("Certificate signed and chain received: {0}".format(order["certificate"])) + log.info("Certificate signed and chain received: %s", order["certificate"]) return http_response.text + def main(argv): + """Parse arguments and get certificate.""" parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description="Tiny ACME client to get TLS certificate by responding to DNS challenges.", epilog="""As the script requires access to your private ACME account key and dns server, -so PLEASE READ THROUGH IT (it's about 300 lines, so it won't take long) ! +so PLEASE READ THROUGH IT (it won't take too long, it's a one-file script) ! Example: requests certificate chain and store it in chain.crt python3 acme_dns_tiny.py ./example.ini > chain.crt See example.ini file to configure correctly this script.""" ) - parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="show only errors on stderr") - parser.add_argument("--verbose", action="store_const", const=logging.DEBUG, help="show all debug informations on stderr") - parser.add_argument("--csr", help="specifies CSR file path to use instead of the CSRFile option from the configuration file.") + parser.add_argument("--quiet", action="store_const", const=logging.ERROR, + help="show only errors on stderr") + parser.add_argument("--verbose", action="store_const", const=logging.DEBUG, + help="show all debug informations on stderr") + parser.add_argument("--csr", + help="specifies CSR file path to use instead of the CSRFile option \ +from the configuration file.") parser.add_argument("configfile", help="path to your configuration file") args = parser.parse_args(argv) config = configparser.ConfigParser() - config.read_dict({"acmednstiny": {"ACMEDirectory": "https://acme-staging-v02.api.letsencrypt.org/directory"}, - "DNS": {"Port": 53, - "TTL": 10}}) + config.read_dict({ + "acmednstiny": {"ACMEDirectory": "https://acme-staging-v02.api.letsencrypt.org/directory"}, + "DNS": {"Port": 53, "TTL": 10}}) config.read(args.configfile) - if args.csr : + if args.csr: config.set("acmednstiny", "csrfile", args.csr) if (set(["accountkeyfile", "csrfile", "acmedirectory"]) - set(config.options("acmednstiny")) - or set(["keyname", "keyvalue", "algorithm"]) - set(config.options("TSIGKeyring")) - or set(["zone", "host", "port", "ttl"]) - set(config.options("DNS"))): + or set(["keyname", "keyvalue", "algorithm"]) - set(config.options("TSIGKeyring")) + or set(["zone", "host", "port", "ttl"]) - set(config.options("DNS"))): raise ValueError("Some required settings are missing.") LOGGER.setLevel(args.verbose or args.quiet or logging.INFO) - signed_crt = get_crt(config, log=LOGGER) + signed_crt = get_crt(config, LOGGER) sys.stdout.write(signed_crt) + if __name__ == "__main__": # pragma: no cover main(sys.argv[1:]) diff --git a/docker/buster/Dockerfile b/docker/buster/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..f45671afbf7d2852409847c65dce4a8b5b94ae91 --- /dev/null +++ b/docker/buster/Dockerfile @@ -0,0 +1,13 @@ +FROM debian:buster-slim + +WORKDIR acme_dns_tiny + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + python3-minimal python3-dnspython python3-requests \ + pylint3 \ + # install recommends for coverage, to include jquery + && apt-get install -y python3-coverage pycodestyle \ + && apt-get clean + +COPY . . diff --git a/docker/jessie/Dockerfile b/docker/jessie/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..1572c8c00513c8d6fcf511ce9dbe54afd5fbe53c --- /dev/null +++ b/docker/jessie/Dockerfile @@ -0,0 +1,11 @@ +FROM debian:jessie-slim + +WORKDIR acme_dns_tiny + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + python3-minimal python3-dnspython python3-requests \ + python3-coverage \ + && apt-get clean + +COPY . . diff --git a/docker/stretch/Dockerfile b/docker/stretch/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..1572c8c00513c8d6fcf511ce9dbe54afd5fbe53c --- /dev/null +++ b/docker/stretch/Dockerfile @@ -0,0 +1,11 @@ +FROM debian:jessie-slim + +WORKDIR acme_dns_tiny + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + python3-minimal python3-dnspython python3-requests \ + python3-coverage \ + && apt-get clean + +COPY . . diff --git a/gitlab-ci/docker/jessie/Dockerfile b/gitlab-ci/docker/jessie/Dockerfile deleted file mode 100644 index fe5da85bcb90bb9ef613a3a181f4359ab59befc2..0000000000000000000000000000000000000000 --- a/gitlab-ci/docker/jessie/Dockerfile +++ /dev/null @@ -1,18 +0,0 @@ -FROM debian:jessie-slim - -# Minimal tools required by acme-dns-tiny CI -RUN apt-get update \ - && apt-get install -y --no-install-recommends \ - python3-dnspython \ - python3-coverage \ - python3-pip \ - && apt-get clean - -# Allows run python3-coverage with same command than manual install by pip -RUN update-alternatives --install \ - /usr/bin/python3-coverage \ - coverage \ - /usr/bin/python3.4-coverage \ - 1 - -RUN ln -s /etc/alternatives/coverage /usr/bin/coverage diff --git a/gitlab-ci/docker/stretch/Dockerfile b/gitlab-ci/docker/stretch/Dockerfile deleted file mode 100644 index 04729c890b987350a6bc35fdcad74bdfdfbcbc2b..0000000000000000000000000000000000000000 --- a/gitlab-ci/docker/stretch/Dockerfile +++ /dev/null @@ -1,16 +0,0 @@ -FROM debian:stretch-slim - -# Minimal tools required by acme-dns-tiny CI -RUN apt-get update \ - && apt-get install -y --no-install-recommends \ - python3-dnspython \ - python3-coverage \ - python3-configargparse \ - python3-pip - -# Allows run python3-coverage with same command than manual install by pip -RUN update-alternatives --install \ - /usr/bin/coverage \ - coverage \ - /usr/bin/python3-coverage \ - 1 diff --git a/gitlab-ci/gitlab-ci.yml b/gitlab-ci/gitlab-ci.yml deleted file mode 100644 index 482513bc78666cea91a639d46aaea3c0259ec5cd..0000000000000000000000000000000000000000 --- a/gitlab-ci/gitlab-ci.yml +++ /dev/null @@ -1,22 +0,0 @@ -jessie: - tags: - - jessie - before_script: - - pip3 install --upgrade -r tests/requirements.txt - script: - - coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_deactivate - - coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_deactivate.py - - coverage html - -stretch: - tags: - - stretch - before_script: - - pip3 install --upgrade -r tests/requirements.txt - script: - - coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_deactivate - - coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_deactivate.py - - coverage html - artifacts: - paths: - - htmlcov diff --git a/tests/config_factory.py b/tests/config_factory.py index 83444002c0c99cf33afdfaad358078da2f796b55..160e6db112b68615a362e1ac266f7fa79d6894da 100644 --- a/tests/config_factory.py +++ b/tests/config_factory.py @@ -1,10 +1,13 @@ -import os, configparser +"""Create real temporary ACME dns tiny configurations to run tests with real server""" +import os +import configparser from tempfile import NamedTemporaryFile from subprocess import Popen # domain with server.py running on it for testing DOMAIN = os.getenv("GITLABCI_DOMAIN") -ACMEDIRECTORY = os.getenv("GITLABCI_ACMEDIRECTORY_V2", "https://acme-staging-v02.api.letsencrypt.org/directory") +ACMEDIRECTORY = os.getenv("GITLABCI_ACMEDIRECTORY_V2", + "https://acme-staging-v02.api.letsencrypt.org/directory") DNSHOST = os.getenv("GITLABCI_DNSHOST") DNSHOSTIP = os.getenv("GITLABCI_DNSHOSTIP") DNSZONE = os.getenv("GITLABCI_DNSZONE") @@ -15,8 +18,9 @@ TSIGKEYVALUE = os.getenv("GITLABCI_TSIGKEYVALUE") TSIGALGORITHM = os.getenv("GITLABCI_TSIGALGORITHM") CONTACT = os.getenv("GITLABCI_CONTACT") -# generate simple config + def generate_config(): + """Generate basic acme-dns-tiny configuration""" # Account key account_key = NamedTemporaryFile(delete=False) Popen(["openssl", "genrsa", "-out", account_key.name, "2048"]).wait() @@ -25,7 +29,7 @@ def generate_config(): domain_key = NamedTemporaryFile(delete=False) domain_csr = NamedTemporaryFile(delete=False) Popen(["openssl", "req", "-newkey", "rsa:2048", "-nodes", "-keyout", domain_key.name, - "-subj", "/CN={0}".format(DOMAIN), "-out", domain_csr.name]).wait() + "-subj", "/CN={0}".format(DOMAIN), "-out", domain_csr.name]).wait() # acme-dns-tiny configuration parser = configparser.ConfigParser() @@ -33,10 +37,9 @@ def generate_config(): parser["acmednstiny"]["AccountKeyFile"] = account_key.name parser["acmednstiny"]["CSRFile"] = domain_csr.name parser["acmednstiny"]["ACMEDirectory"] = ACMEDIRECTORY - if (CONTACT is not None - and CONTACT != ""): + if CONTACT: parser["acmednstiny"]["Contacts"] = "mailto:{0}".format(CONTACT) - else: + elif "Contacts" in parser: del parser["acmednstiny"]["Contacts"] parser["TSIGKeyring"]["KeyName"] = TSIGKEYNAME parser["TSIGKeyring"]["KeyValue"] = TSIGKEYVALUE @@ -48,60 +51,74 @@ def generate_config(): return account_key.name, domain_key.name, domain_csr.name, parser -# generate account and domain keys -def generate_acme_dns_tiny_config(): + +def generate_acme_dns_tiny_unit_test_config(): + """Genereate acme_dns_tiny configurations used for unit tests""" + # Configuration missing DNS section + _, domain_key, _, config = generate_config() + os.remove(domain_key) + + missing_dns = NamedTemporaryFile(delete=False) + config["DNS"] = {} + with open(missing_dns.name, 'w') as configfile: + config.write(configfile) + + return {"missing_dns": missing_dns.name} + + +def generate_acme_dns_tiny_config(): # pylint: disable=too-many-locals,too-many-statements + """Generate acme_dns_tiny configuration with account and domain keys""" # Simple configuration with good options - account_key, domain_key, domain_csr, config = generate_config(); + account_key, domain_key, _, config = generate_config() os.remove(domain_key) - goodCName = NamedTemporaryFile(delete=False) - with open(goodCName.name, 'w') as configfile: + good_cname = NamedTemporaryFile(delete=False) + with open(good_cname.name, 'w') as configfile: config.write(configfile) # Simple configuration with good options, without contacts field - account_key, domain_key, domain_csr, config = generate_config(); + account_key, domain_key, _, config = generate_config() os.remove(domain_key) config.remove_option("acmednstiny", "Contacts") - goodCNameWithoutContacts = NamedTemporaryFile(delete=False) - with open(goodCNameWithoutContacts.name, 'w') as configfile: + good_cname_without_contacts = NamedTemporaryFile(delete=False) + with open(good_cname_without_contacts.name, 'w') as configfile: config.write(configfile) # Simple configuration without CSR in configuration (will be passed as argument) - account_key, domain_key, domain_csr, config = generate_config(); + account_key, domain_key, cname_csr, config = generate_config() os.remove(domain_key) - cnameCSR = domain_csr config.remove_option("acmednstiny", "CSRFile") - goodCNameWithoutCSR = NamedTemporaryFile(delete=False) - with open(goodCNameWithoutCSR.name, 'w') as configfile: + good_cname_without_csr = NamedTemporaryFile(delete=False) + with open(good_cname_without_csr.name, 'w') as configfile: config.write(configfile) # Configuration with CSR containing a wildcard domain - account_key, domain_key, domain_csr, config = generate_config(); + account_key, domain_key, domain_csr, config = generate_config() Popen(["openssl", "req", "-newkey", "rsa:2048", "-nodes", "-keyout", domain_key, "-subj", "/CN=*.{0}".format(DOMAIN), "-out", domain_csr]).wait() os.remove(domain_key) - wildCName = NamedTemporaryFile(delete=False) - with open(wildCName.name, 'w') as configfile: + wild_cname = NamedTemporaryFile(delete=False) + with open(wild_cname.name, 'w') as configfile: config.write(configfile) # Configuration with IP as DNS Host - account_key, domain_key, domain_csr, config = generate_config(); + account_key, domain_key, _, config = generate_config() os.remove(domain_key) config["DNS"]["Host"] = DNSHOSTIP - dnsHostIP = NamedTemporaryFile(delete=False) - with open(dnsHostIP.name, 'w') as configfile: + dns_host_ip = NamedTemporaryFile(delete=False) + with open(dns_host_ip.name, 'w') as configfile: config.write(configfile) # Configuration with CSR using subject alt-name domain instead of CN (common name) - account_key, domain_key, domain_csr, config = generate_config(); + account_key, domain_key, domain_csr, config = generate_config() san_conf = NamedTemporaryFile(delete=False) with open("/etc/ssl/openssl.cnf", 'r') as opensslcnf: @@ -109,123 +126,114 @@ def generate_acme_dns_tiny_config(): san_conf.write("\n[SAN]\nsubjectAltName=DNS:{0},DNS:www.{0}\n".format(DOMAIN).encode("utf8")) san_conf.seek(0) Popen(["openssl", "req", "-new", "-sha256", "-key", domain_key, - "-subj", "/", "-reqexts", "SAN", "-config", san_conf.name, - "-out", domain_csr]).wait() + "-subj", "/", "-reqexts", "SAN", "-config", san_conf.name, + "-out", domain_csr]).wait() os.remove(san_conf.name) os.remove(domain_key) - goodSAN = NamedTemporaryFile(delete=False) - with open(goodSAN.name, 'w') as configfile: + good_san = NamedTemporaryFile(delete=False) + with open(good_san.name, 'w') as configfile: config.write(configfile) - # Configuration with CSR containing a wildcard domain inside subjetcAltName - account_key, domain_key, domain_csr, config = generate_config(); + account_key, domain_key, domain_csr, config = generate_config() - wildsan_conf = NamedTemporaryFile(delete=False) + wild_san_conf = NamedTemporaryFile(delete=False) with open("/etc/ssl/openssl.cnf", 'r') as opensslcnf: - wildsan_conf.write(opensslcnf.read().encode("utf8")) - wildsan_conf.write("\n[SAN]\nsubjectAltName=DNS:{0},DNS:*.{0}\n".format(DOMAIN).encode("utf8")) - wildsan_conf.seek(0) + wild_san_conf.write(opensslcnf.read().encode("utf8")) + wild_san_conf.write("\n[SAN]\nsubjectAltName=DNS:{0},DNS:*.{0}\n".format(DOMAIN).encode("utf8")) + wild_san_conf.seek(0) Popen(["openssl", "req", "-new", "-sha256", "-key", domain_key, - "-subj", "/", "-reqexts", "SAN", "-config", wildsan_conf.name, + "-subj", "/", "-reqexts", "SAN", "-config", wild_san_conf.name, "-out", domain_csr]).wait() - os.remove(wildsan_conf.name) + os.remove(wild_san_conf.name) os.remove(domain_key) - wildSAN = NamedTemporaryFile(delete=False) - with open(wildSAN.name, 'w') as configfile: + wild_san = NamedTemporaryFile(delete=False) + with open(wild_san.name, 'w') as configfile: config.write(configfile) # Bad configuration with weak 1024 bit account key - account_key, domain_key, domain_csr, config = generate_config(); + account_key, domain_key, _, config = generate_config() os.remove(domain_key) Popen(["openssl", "genrsa", "-out", account_key, "1024"]).wait() - weakKey = NamedTemporaryFile(delete=False) - with open(weakKey.name, 'w') as configfile: + weak_key = NamedTemporaryFile(delete=False) + with open(weak_key.name, 'w') as configfile: config.write(configfile) # Bad configuration with account key as domain key - account_key, domain_key, domain_csr, config = generate_config(); + account_key, domain_key, domain_csr, config = generate_config() os.remove(domain_key) # Create a new CSR signed with the account key instead of domain key Popen(["openssl", "req", "-new", "-sha256", "-key", account_key, - "-subj", "/CN={0}".format(DOMAIN), "-out", domain_csr]).wait() + "-subj", "/CN={0}".format(DOMAIN), "-out", domain_csr]).wait() - accountAsDomain = NamedTemporaryFile(delete=False) - with open(accountAsDomain.name, 'w') as configfile: + account_as_domain = NamedTemporaryFile(delete=False) + with open(account_as_domain.name, 'w') as configfile: config.write(configfile) # Create config parser from the good default config to generate custom configs - account_key, domain_key, domain_csr, config = generate_config(); + account_key, domain_key, _, config = generate_config() os.remove(domain_key) - invalidTSIGName = NamedTemporaryFile(delete=False) + invalid_tsig_name = NamedTemporaryFile(delete=False) config["TSIGKeyring"]["KeyName"] = "{0}.invalid".format(TSIGKEYNAME) - with open(invalidTSIGName.name, 'w') as configfile: - config.write(configfile) - - # Create config parser from the good default config to generate custom configs - account_key, domain_key, domain_csr, config = generate_config(); - os.remove(domain_key) - - missingDNS = NamedTemporaryFile(delete=False) - config["DNS"] = {} - with open(missingDNS.name, 'w') as configfile: + with open(invalid_tsig_name.name, 'w') as configfile: config.write(configfile) return { # configs - "goodCName": goodCName.name, - "goodCNameWithoutContacts": goodCNameWithoutContacts.name, - "goodCNameWithoutCSR": goodCNameWithoutCSR.name, - "wildCName": wildCName.name, - "dnsHostIP": dnsHostIP.name, - "goodSAN": goodSAN.name, - "wildSAN": wildSAN.name, - "weakKey": weakKey.name, - "accountAsDomain": accountAsDomain.name, - "invalidTSIGName": invalidTSIGName.name, - "missingDNS": missingDNS.name, - # CName CSR file to use with goodCNameWithoutCSR as argument - "cnameCSR": domain_csr, + "good_cname": good_cname.name, + "good_cname_without_contacts": good_cname_without_contacts.name, + "good_cname_without_csr": good_cname_without_csr.name, + "wild_cname": wild_cname.name, + "dns_host_ip": dns_host_ip.name, + "good_san": good_san.name, + "wild_san": wild_san.name, + "weak_key": weak_key.name, + "account_as_domain": account_as_domain.name, + "invalid_tsig_name": invalid_tsig_name.name, + # cname CSR file to use with good_cname_without_csr as argument + "cname_csr": cname_csr, } -# generate two account keys to roll over them + def generate_acme_account_rollover_config(): - # Old account is directly created by the config generator - old_account_key, domain_key, domain_csr, config = generate_config() + """Generate config for acme_account_rollover script""" + # Old account key is directly created by the config generator + old_account_key, domain_key, _, config = generate_config() os.remove(domain_key) # New account key new_account_key = NamedTemporaryFile(delete=False) Popen(["openssl", "genrsa", "-out", new_account_key.name, "2048"]).wait() - rolloverAccount = NamedTemporaryFile(delete=False) - with open(rolloverAccount.name, 'w') as configfile: + rollover_account = NamedTemporaryFile(delete=False) + with open(rollover_account.name, 'w') as configfile: config.write(configfile) return { # config and keys (returned to keep files on system) - "config": rolloverAccount.name, - "oldaccountkey": old_account_key, - "newaccountkey": new_account_key.name + "config": rollover_account.name, + "old_account_key": old_account_key, + "new_account_key": new_account_key.name } -# generate an account key to delete it + def generate_acme_account_deactivate_config(): + """Generate config for acme_account_deactivate script""" # Account key is created by the by the config generator - account_key, domain_key, domain_csr, config = generate_config() + account_key, domain_key, _, config = generate_config() os.remove(domain_key) - deactivateAccount = NamedTemporaryFile(delete=False) - with open(deactivateAccount.name, 'w') as configfile: + deactivate_account = NamedTemporaryFile(delete=False) + with open(deactivate_account.name, 'w') as configfile: config.write(configfile) return { - "config": deactivateAccount.name, + "config": deactivate_account.name, "key": account_key } diff --git a/tests/staging_test_acme_account_deactivate.py b/tests/staging_test_acme_account_deactivate.py new file mode 100644 index 0000000000000000000000000000000000000000..10f89ac8d73ec8752b9ce57f5262c3b75482582c --- /dev/null +++ b/tests/staging_test_acme_account_deactivate.py @@ -0,0 +1,59 @@ +"""Test acme_account_deactivate script with real ACME server""" +import unittest +import os +import configparser +import acme_dns_tiny +from tests.config_factory import generate_acme_account_deactivate_config +import tools.acme_account_deactivate + +ACME_DIRECTORY = os.getenv("GITLABCI_ACMEDIRECTORY_V2", + "https://acme-staging-v02.api.letsencrypt.org/directory") + + +class TestACMEAccountDeactivate(unittest.TestCase): + """Tests for acme_account_deactivate.""" + + @classmethod + def setUpClass(cls): + cls.configs = generate_acme_account_deactivate_config() + try: + acme_dns_tiny.main([cls.configs['config']]) + except ValueError as err: + if str(err).startswith("Error register"): + raise ValueError("Fail test as account has not been registered correctly: {0}" + .format(err)) + + super(TestACMEAccountDeactivate, cls).setUpClass() + + # To clean ACME staging server and close correctly temporary files + # pylint: disable=bare-except + @classmethod + def tearDownClass(cls): + # Remove temporary files + parser = configparser.ConfigParser() + parser.read(cls.configs['config']) + try: + os.remove(parser["acmednstiny"]["AccountKeyFile"]) + except: + pass + try: + os.remove(parser["acmednstiny"]["CSRFile"]) + except: + pass + try: + os.remove(cls.configs['config']) + except: + pass + super(TestACMEAccountDeactivate, cls).tearDownClass() + + def test_success_account_deactivate(self): + """ Test success account key deactivate """ + with self.assertLogs(level='INFO') as accountdeactivatelog: + tools.acme_account_deactivate.main(["--account-key", self.configs['key'], + "--acme-directory", ACME_DIRECTORY]) + self.assertIn("INFO:acme_account_deactivate:The account has been deactivated.", + accountdeactivatelog.output) + + +if __name__ == "__main__": # pragma: no cover + unittest.main() diff --git a/tests/staging_test_acme_account_rollover.py b/tests/staging_test_acme_account_rollover.py new file mode 100644 index 0000000000000000000000000000000000000000..3448761e2fe1c07efd00f95582b11146a4e16cca --- /dev/null +++ b/tests/staging_test_acme_account_rollover.py @@ -0,0 +1,64 @@ +"""Test acme_account_rollover script with real ACME server""" +import unittest +import os +import configparser +import acme_dns_tiny +from tests.config_factory import generate_acme_account_rollover_config +from tools.acme_account_deactivate import account_deactivate +import tools.acme_account_rollover + +ACME_DIRECTORY = os.getenv("GITLABCI_ACMEDIRECTORY_V2", + "https://acme-staging-v02.api.letsencrypt.org/directory") + + +class TestACMEAccountRollover(unittest.TestCase): + """Tests for acme_account_rollover.""" + + @classmethod + def setUpClass(cls): + cls.configs = generate_acme_account_rollover_config() + acme_dns_tiny.main([cls.configs['config']]) + super(TestACMEAccountRollover, cls).setUpClass() + + # To clean ACME staging server and close correctly temporary files + # pylint: disable=bare-except + @classmethod + def tearDownClass(cls): + # Remove temporary files + parser = configparser.ConfigParser() + parser.read(cls.configs['config']) + try: + # deactivate account key registration at end of tests + # (we assume the key has been rolled over) + account_deactivate(cls.configs["new_account_key"], ACME_DIRECTORY) + except: + pass + try: + os.remove(parser["acmednstiny"]["AccountKeyFile"]) + except: + pass + try: + os.remove(parser["acmednstiny"]["CSRFile"]) + except: + pass + try: + os.remove(cls.configs["new_account_key"]) + except: + pass + try: + os.remove(cls.configs['config']) + except: + pass + super(TestACMEAccountRollover, cls).tearDownClass() + + def test_success_account_rollover(self): + """ Test success account key rollover.""" + with self.assertLogs(level='INFO') as accountrolloverlog: + tools.acme_account_rollover.main(["--current", self.configs['old_account_key'], + "--new", self.configs['new_account_key'], + "--acme-directory", ACME_DIRECTORY]) + self.assertIn("INFO:acme_account_rollover:Keys rolled over.", accountrolloverlog.output) + + +if __name__ == "__main__": # pragma: no cover + unittest.main() diff --git a/tests/staging_test_acme_dns_tiny.py b/tests/staging_test_acme_dns_tiny.py new file mode 100644 index 0000000000000000000000000000000000000000..346619c14b44a470165a44b51ba83b388d779c65 --- /dev/null +++ b/tests/staging_test_acme_dns_tiny.py @@ -0,0 +1,219 @@ +"""Tests for acme_dns_tiny script to be run with real ACME server""" +import unittest +import sys +import os +import subprocess +import configparser +from io import StringIO +import dns.version +import acme_dns_tiny +from tests.config_factory import generate_acme_dns_tiny_config +from tools.acme_account_deactivate import account_deactivate + +ACME_DIRECTORY = os.getenv("GITLABCI_ACMEDIRECTORY_V2", + "https://acme-staging-v02.api.letsencrypt.org/directory") + + +def _openssl(command, options, communicate=None): + """Helper function to run openssl command.""" + openssl = subprocess.Popen(["openssl", command] + options, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = openssl.communicate(communicate) + if openssl.returncode != 0: + raise IOError("OpenSSL Error: {0}".format(err)) + return out.decode("utf8") + + +class TestACMEDNSTiny(unittest.TestCase): + """Tests for acme_dns_tiny.get_crt().""" + + @classmethod + def setUpClass(cls): + print("Init acme_dns_tiny with python modules:") + print(" - python: {0}".format(sys.version)) + print(" - dns python: {0}".format(dns.version.version)) + cls.configs = generate_acme_dns_tiny_config() + sys.stdout.flush() + super(TestACMEDNSTiny, cls).setUpClass() + + # To clean ACME staging server and close correctly temporary files + # pylint: disable=bare-except + @classmethod + def tearDownClass(cls): + # close temp files correctly + for conffile in cls.configs: + # for each configuration file, deactivate the account and remove linked temporary files + if conffile != "cname_csr": + parser = configparser.ConfigParser() + parser.read(cls.configs[conffile]) + try: + account_deactivate(parser["acmednstiny"]["AccountKeyFile"], ACME_DIRECTORY) + except: + pass + try: + os.remove(parser["acmednstiny"]["AccountKeyFile"]) + except: + pass + try: + os.remove(parser["acmednstiny"]["CSRFile"]) + except: + pass + try: + os.remove(cls.configs[conffile]) + except: + pass + super(TestACMEDNSTiny, cls).tearDownClass() + + # helper function to valid success by making assertion on returned certificate chain + def _assert_certificate_chain(self, cert_chain): + # Output have to contains two certificates + certlist = cert_chain.split("-----BEGIN CERTIFICATE-----") + self.assertEqual(3, len(certlist)) + self.assertEqual('', certlist[0]) + self.assertIn("-----END CERTIFICATE-----{0}".format(os.linesep), certlist[1]) + self.assertIn("-----END CERTIFICATE-----{0}".format(os.linesep), certlist[2]) + # Use openssl to check validity of chain and simple test of readability + readablecertchain = _openssl("x509", ["-text", "-noout"], + cert_chain.encode("utf8")) + self.assertIn("Issuer", readablecertchain) + + def test_success_cn(self): + """Successfully issue a certificate via common name.""" + old_stdout = sys.stdout + sys.stdout = StringIO() + + acme_dns_tiny.main([self.configs['good_cname'], "--verbose"]) + certchain = sys.stdout.getvalue() + + sys.stdout.close() + sys.stdout = old_stdout + + self._assert_certificate_chain(certchain) + + def test_success_cn_without_contacts(self): + """Successfully issue a certificate via CN, but without Contacts field.""" + old_stdout = sys.stdout + sys.stdout = StringIO() + + acme_dns_tiny.main([self.configs['good_cname_without_contacts'], "--verbose"]) + certchain = sys.stdout.getvalue() + + sys.stdout.close() + sys.stdout = old_stdout + + self._assert_certificate_chain(certchain) + + def test_success_cn_with_csr_option(self): + """Successfully issue a certificate using CSR option outside from the config file.""" + old_stdout = sys.stdout + sys.stdout = StringIO() + + acme_dns_tiny.main(["--csr", self.configs['cname_csr'], + self.configs['good_cname_without_csr'], "--verbose"]) + certchain = sys.stdout.getvalue() + + sys.stdout.close() + sys.stdout = old_stdout + + self._assert_certificate_chain(certchain) + + def test_success_wild_cn(self): + """Successfully issue a certificate via a wildcard common name.""" + old_stdout = sys.stdout + sys.stdout = StringIO() + + acme_dns_tiny.main([self.configs['wild_cname'], "--verbose"]) + certchain = sys.stdout.getvalue() + + sys.stdout.close() + sys.stdout = old_stdout + + self._assert_certificate_chain(certchain) + + def test_success_dnshost_ip(self): + """When DNS Host is an IP, DNS resolution have to fail without error.""" + old_stdout = sys.stdout + sys.stdout = StringIO() + + with self.assertLogs(level='INFO') as adnslog: + acme_dns_tiny.main([self.configs['dns_host_ip'], + "--verbose"]) + self.assertIn(("INFO:acme_dns_tiny:A and/or AAAA DNS resources not found for configured " + "dns host: we will use either resource found if one exists or directly the " + "DNS Host configuration."), adnslog.output) + certchain = sys.stdout.getvalue() + + sys.stdout.close() + sys.stdout = old_stdout + + self._assert_certificate_chain(certchain) + + def test_success_san(self): + """Successfully issue a certificate via subject alt name.""" + old_stdout = sys.stdout + sys.stdout = StringIO() + + acme_dns_tiny.main([self.configs['good_san'], "--verbose"]) + certchain = sys.stdout.getvalue() + + sys.stdout.close() + sys.stdout = old_stdout + + self._assert_certificate_chain(certchain) + + def test_success_wildsan(self): + """Successfully issue a certificate via wildcard in subject alt name.""" + old_stdout = sys.stdout + sys.stdout = StringIO() + + acme_dns_tiny.main([self.configs['wild_san']]) + certchain = sys.stdout.getvalue() + + sys.stdout.close() + sys.stdout = old_stdout + + self._assert_certificate_chain(certchain) + + def test_success_cli(self): + """Successfully issue a certificate via command line interface.""" + certout, _ = subprocess.Popen([ + "python3", "acme_dns_tiny.py", self.configs['good_cname'], "--verbose" + ], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() + + certchain = certout.decode("utf8") + + self._assert_certificate_chain(certchain) + + def test_success_cli_with_csr_option(self): + """Successfully issue a certificate via command line interface using CSR option.""" + certout, _ = subprocess.Popen([ + "python3", "acme_dns_tiny.py", "--csr", self.configs['cname_csr'], + self.configs['good_cname_without_csr'], "--verbose" + ], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() + + certchain = certout.decode("utf8") + + self._assert_certificate_chain(certchain) + + def test_weak_key(self): + """Let's Encrypt rejects weak keys.""" + self.assertRaisesRegex(ValueError, + "key too small", + acme_dns_tiny.main, [self.configs['weak_key'], "--verbose"]) + + def test_account_key_domain(self): + """Can't use the account key for the CSR.""" + self.assertRaisesRegex(ValueError, + "certificate public key must be different than account key", + acme_dns_tiny.main, [self.configs['account_as_domain'], "--verbose"]) + + def test_failure_dns_update_tsigkeyname(self): + """Fail to update DNS records by invalid TSIG Key name.""" + self.assertRaisesRegex(ValueError, + "Error updating DNS", + acme_dns_tiny.main, [self.configs['invalid_tsig_name'], "--verbose"]) + + +if __name__ == "__main__": # pragma: no cover + unittest.main() diff --git a/tests/test_acme_account_deactivate.py b/tests/test_acme_account_deactivate.py deleted file mode 100644 index 19569ceb078f27e5dc53a109332a26c4a94d7ae0..0000000000000000000000000000000000000000 --- a/tests/test_acme_account_deactivate.py +++ /dev/null @@ -1,45 +0,0 @@ -import unittest, os, time, configparser -import acme_dns_tiny -from tests.config_factory import generate_acme_account_deactivate_config -import tools.acme_account_deactivate - -ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY_V2", "https://acme-staging-v02.api.letsencrypt.org/directory") - -class TestACMEAccountDeactivate(unittest.TestCase): - "Tests for acme_account_deactivate" - - @classmethod - def setUpClass(self): - self.configs = generate_acme_account_deactivate_config() - try: - acme_dns_tiny.main([self.configs['config']]) - except ValueError as err: - if str(err).startswith("Error register"): - raise ValueError("Fail test as account has not been registered correctly: {0}".format(err)) - - super(TestACMEAccountDeactivate, self).setUpClass() - - # To clean ACME staging server and close correctly temporary files - @classmethod - def tearDownClass(self): - # Remove temporary files - parser = configparser.ConfigParser() - parser.read(self.configs['config']) - try: - os.remove(parser["acmednstiny"]["AccountKeyFile"]) - os.remove(parser["acmednstiny"]["CSRFile"]) - os.remove(self.configs['config']) - except: - pass - super(TestACMEAccountDeactivate, self).tearDownClass() - - def test_success_account_deactivate(self): - """ Test success account key deactivate """ - with self.assertLogs(level='INFO') as accountdeactivatelog: - tools.acme_account_deactivate.main(["--account-key", self.configs['key'], - "--acme-directory", ACMEDirectory]) - self.assertIn("INFO:acme_account_deactivate:Account key deactivated !", - accountdeactivatelog.output) - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_acme_account_rollover.py b/tests/test_acme_account_rollover.py deleted file mode 100644 index 05f88ce61ad1c4c6e70d33d8f29c43c7bac955a6..0000000000000000000000000000000000000000 --- a/tests/test_acme_account_rollover.py +++ /dev/null @@ -1,46 +0,0 @@ -import unittest, os, time, configparser -import acme_dns_tiny -from tests.config_factory import generate_acme_account_rollover_config -from tools.acme_account_deactivate import account_deactivate -import tools.acme_account_rollover - -ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY_V2", "https://acme-staging-v02.api.letsencrypt.org/directory") - -class TestACMEAccountRollover(unittest.TestCase): - "Tests for acme_account_rollover" - - @classmethod - def setUpClass(self): - self.configs = generate_acme_account_rollover_config() - acme_dns_tiny.main([self.configs['config']]) - super(TestACMEAccountRollover, self).setUpClass() - - # To clean ACME staging server and close correctly temporary files - @classmethod - def tearDownClass(self): - # deactivate account key registration at end of tests - # (we assume the key has been roll oved) - account_deactivate(self.configs["newaccountkey"], ACMEDirectory) - # Remove temporary files - parser = configparser.ConfigParser() - parser.read(self.configs['config']) - try: - os.remove(parser["acmednstiny"]["AccountKeyFile"]) - os.remove(parser["acmednstiny"]["CSRFile"]) - os.remove(self.configs["newaccountkey"]) - os.remove(self.configs['config']) - except: - pass - super(TestACMEAccountRollover, self).tearDownClass() - - def test_success_account_rollover(self): - """ Test success account key rollover """ - with self.assertLogs(level='INFO') as accountrolloverlog: - tools.acme_account_rollover.main(["--current", self.configs['oldaccountkey'], - "--new", self.configs['newaccountkey'], - "--acme-directory", ACMEDirectory]) - self.assertIn("INFO:acme_account_rollover:Account keys rolled over !", - accountrolloverlog.output) - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_acme_dns_tiny.py b/tests/test_acme_dns_tiny.py deleted file mode 100644 index 7b245a150ff6e404de58fa896ccaa27996bb812c..0000000000000000000000000000000000000000 --- a/tests/test_acme_dns_tiny.py +++ /dev/null @@ -1,200 +0,0 @@ -import unittest, sys, os, subprocess, time, configparser -from io import StringIO -import dns.version -import acme_dns_tiny -from tests.config_factory import generate_acme_dns_tiny_config -from tools.acme_account_deactivate import account_deactivate - -ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY_V2", "https://acme-staging-v02.api.letsencrypt.org/directory") - -class TestACMEDNSTiny(unittest.TestCase): - "Tests for acme_dns_tiny.get_crt()" - - @classmethod - def setUpClass(self): - print("Init acme_dns_tiny with python modules:") - print(" - python: {0}".format(sys.version)) - print(" - dns python: {0}".format(dns.version.version)) - self.configs = generate_acme_dns_tiny_config() - sys.stdout.flush() - super(TestACMEDNSTiny, self).setUpClass() - - # To clean ACME staging server and close correctly temporary files - @classmethod - def tearDownClass(self): - # close temp files correctly - for conffile in self.configs: - parser = configparser.ConfigParser() - parser.read(conffile) - try: - os.remove(parser["acmednstiny"]["AccountKeyFile"]) - os.remove(parser["acmednstiny"]["CSRFile"]) - # for each configuraiton, deactivate the account key - if conffile != "cnameCSR": - account_deactivate(parser["acmednstiny"]["AccountKeyFile"], ACMEDirectory) - os.remove(conffile) - except: - pass - super(TestACMEDNSTiny, self).tearDownClass() - - # helper function to run openssl command - def openssl(self, command, options, communicate=None): - openssl = subprocess.Popen(["openssl", command] + options, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out, err = openssl.communicate(communicate) - if openssl.returncode != 0: - raise IOError("OpenSSL Error: {0}".format(err)) - return out.decode("utf8") - - # helper function to valid success by making assertion on returned certificate chain - def assertCertificateChain(self, certificateChain): - # Output have to contains two certiicates - certlist = certificateChain.split("-----BEGIN CERTIFICATE-----") - self.assertEqual(3, len(certlist)) - self.assertEqual('', certlist[0]) - self.assertIn("-----END CERTIFICATE-----{0}".format(os.linesep), certlist[1]) - self.assertIn("-----END CERTIFICATE-----{0}".format(os.linesep), certlist[2]) - # Use openssl to check validity of chain and simple test of readability - readablecertchain = self.openssl("x509", ["-text", "-noout"], certificateChain.encode("utf8")) - self.assertIn("Issuer", readablecertchain) - - def test_success_cn(self): - """ Successfully issue a certificate via common name """ - old_stdout = sys.stdout - sys.stdout = StringIO() - - acme_dns_tiny.main([self.configs['goodCName'], "--verbose"]) - certchain = sys.stdout.getvalue() - - sys.stdout.close() - sys.stdout = old_stdout - - self.assertCertificateChain(certchain) - - def test_success_cn_without_contacts(self): - """ Successfully issue a certificate via CN, but without Contacts field """ - old_stdout = sys.stdout - sys.stdout = StringIO() - - acme_dns_tiny.main([self.configs['goodCNameWithoutContacts'], "--verbose"]) - certchain = sys.stdout.getvalue() - - sys.stdout.close() - sys.stdout = old_stdout - - self.assertCertificateChain(certchain) - - def test_success_cn_with_csr_option(self): - """ Successfully issue a certificate using CSR option outside from the config file""" - old_stdout = sys.stdout - sys.stdout = StringIO() - - acme_dns_tiny.main(["--csr", self.configs['cnameCSR'], self.configs['goodCNameWithoutCSR'], "--verbose"]) - certchain = sys.stdout.getvalue() - - sys.stdout.close() - sys.stdout = old_stdout - - self.assertCertificateChain(certchain) - - def test_success_wild_cn(self): - """ Successfully issue a certificate via a wildcard common name """ - old_stdout = sys.stdout - sys.stdout = StringIO() - - acme_dns_tiny.main([self.configs['wildCName'], "--verbose"]) - certchain = sys.stdout.getvalue() - - sys.stdout.close() - sys.stdout = old_stdout - - self.assertCertificateChain(certchain) - - def test_success_dnshost_ip(self): - """ When DNS Host is an IP, DNS resolution have to fail without error """ - old_stdout = sys.stdout - sys.stdout = StringIO() - - with self.assertLogs(level='INFO') as adnslog: - acme_dns_tiny.main([self.configs['dnsHostIP'], "--verbose"]) - self.assertIn("INFO:acme_dns_tiny:A and/or AAAA DNS resources not found for configured dns host: we will use either resource found if one exists or directly the DNS Host configuration.", - adnslog.output) - certchain = sys.stdout.getvalue() - - sys.stdout.close() - sys.stdout = old_stdout - - self.assertCertificateChain(certchain) - - def test_success_san(self): - """ Successfully issue a certificate via subject alt name """ - old_stdout = sys.stdout - sys.stdout = StringIO() - - acme_dns_tiny.main([self.configs['goodSAN'], "--verbose"]) - certchain = sys.stdout.getvalue() - - sys.stdout.close() - sys.stdout = old_stdout - - self.assertCertificateChain(certchain) - - def test_success_wildsan(self): - """ Successfully issue a certificate via wildcard in subject alt name """ - old_stdout = sys.stdout - sys.stdout = StringIO() - - acme_dns_tiny.main([self.configs['wildSAN']]) - certchain = sys.stdout.getvalue() - - sys.stdout.close() - sys.stdout = old_stdout - - self.assertCertificateChain(certchain) - - def test_success_cli(self): - """ Successfully issue a certificate via command line interface """ - certout, err = subprocess.Popen([ - "python3", "acme_dns_tiny.py", self.configs['goodCName'], "--verbose" - ], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() - - certchain = certout.decode("utf8") - - self.assertCertificateChain(certchain) - - def test_success_cli_with_csr_option(self): - """ Successfully issue a certificate via command line interface using CSR option""" - certout, err = subprocess.Popen([ - "python3", "acme_dns_tiny.py", "--csr", self.configs['cnameCSR'], self.configs['goodCNameWithoutCSR'], "--verbose" - ], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() - - certchain = certout.decode("utf8") - - self.assertCertificateChain(certchain) - - def test_weak_key(self): - """ Let's Encrypt rejects weak keys """ - self.assertRaisesRegex(ValueError, - "key too small", - acme_dns_tiny.main, [self.configs['weakKey'], "--verbose"]) - - def test_account_key_domain(self): - """ Can't use the account key for the CSR """ - self.assertRaisesRegex(ValueError, - "certificate public key must be different than account key", - acme_dns_tiny.main, [self.configs['accountAsDomain'], "--verbose"]) - - def test_failure_dns_update_tsigkeyname(self): - """ Fail to update DNS records by invalid TSIG Key name """ - self.assertRaisesRegex(ValueError, - "Error updating DNS", - acme_dns_tiny.main, [self.configs['invalidTSIGName'], "--verbose"]) - - def test_failure_notcompleted_configuration(self): - """ Configuration file have to be completed """ - self.assertRaisesRegex(ValueError, - "Some required settings are missing\.", - acme_dns_tiny.main, [self.configs['missingDNS'], "--verbose"]) - -if __name__ == "__main__": - unittest.main() diff --git a/tests/unit_test_acme_dns_tiny.py b/tests/unit_test_acme_dns_tiny.py new file mode 100644 index 0000000000000000000000000000000000000000..9fdb3d7bc0569212882f246580286f8b2e53c10b --- /dev/null +++ b/tests/unit_test_acme_dns_tiny.py @@ -0,0 +1,42 @@ +"""Unit tests for the acme_dns_tiny script""" +import unittest +import sys +import os +import configparser +import dns.version +import acme_dns_tiny +from tests.config_factory import generate_acme_dns_tiny_unit_test_config + + +class TestACMEDNSTiny(unittest.TestCase): + "Tests for acme_dns_tiny.get_crt()" + + @classmethod + def setUpClass(cls): + print("Init acme_dns_tiny with python modules:") + print(" - python: {0}".format(sys.version)) + print(" - dns python: {0}".format(dns.version.version)) + cls.configs = generate_acme_dns_tiny_unit_test_config() + sys.stdout.flush() + super(TestACMEDNSTiny, cls).setUpClass() + + # Close correctly temporary files + @classmethod + def tearDownClass(cls): + # close temp files correctly + for conffile in cls.configs: + parser = configparser.ConfigParser() + parser.read(cls.configs[conffile]) + os.remove(parser["acmednstiny"]["AccountKeyFile"]) + os.remove(parser["acmednstiny"]["CSRFile"]) + os.remove(cls.configs[conffile]) + super(TestACMEDNSTiny, cls).tearDownClass() + + def test_failure_notcompleted_configuration(self): + """ Configuration file have to be completed """ + self.assertRaisesRegex(ValueError, r"Some required settings are missing.", + acme_dns_tiny.main, [self.configs['missing_dns'], "--verbose"]) + + +if __name__ == "__main__": # pragma: no cover + unittest.main() diff --git a/tools/acme_account_deactivate.py b/tools/acme_account_deactivate.py index 845a6c1f522002d65e7564eec33e91985e3365f0..eb687fecb6b84aa49ee2de7aec19d8e01f64ad25 100644 --- a/tools/acme_account_deactivate.py +++ b/tools/acme_account_deactivate.py @@ -1,116 +1,139 @@ #!/usr/bin/env python3 -import sys, argparse, subprocess, json, base64, binascii, re, copy, logging, requests +"""Tiny script to deactivate account on an ACME server.""" +import sys +import argparse +import subprocess +import json +import base64 +import binascii +import re +import copy +import logging +import requests LOGGER = logging.getLogger("acme_account_deactivate") LOGGER.addHandler(logging.StreamHandler()) + +def _b64(text): + """Encodes text as base64 as specified in ACME RFC.""" + return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") + + +def _openssl(command, options, communicate=None): + """Run openssl command line and raise IOError on non-zero return.""" + openssl = subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = openssl.communicate(communicate) + if openssl.returncode != 0: + raise IOError("OpenSSL Error: {0}".format(err)) + return out + + def account_deactivate(accountkeypath, acme_directory, log=LOGGER): - def _b64(b): - """"Encodes string as base64 as specified in ACME RFC """ - return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=") - - def _openssl(command, options, communicate=None): - """Run openssl command line and raise IOError on non-zero return.""" - openssl = subprocess.Popen(["openssl", command] + options, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out, err = openssl.communicate(communicate) - if openssl.returncode != 0: - raise IOError("OpenSSL Error: {0}".format(err)) - return out + """Deactivate an ACME account.""" def _send_signed_request(url, payload): """Sends signed requests to ACME server.""" - nonlocal jws_nonce - if payload == "": # on POST-as-GET, final payload has to be just empty string + nonlocal nonce + if payload == "": # on POST-as-GET, final payload has to be just empty string payload64 = "" else: payload64 = _b64(json.dumps(payload).encode("utf8")) - protected = copy.deepcopy(jws_header) - protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce'] + protected = copy.deepcopy(private_acme_signature) + protected["nonce"] = nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce'] protected["url"] = url if url == acme_config["newAccount"]: - del protected["kid"] + if "kid" in protected: + del protected["kid"] else: del protected["jwk"] protected64 = _b64(json.dumps(protected).encode("utf8")) signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath], "{0}.{1}".format(protected64, payload64).encode("utf8")) jose = { - "protected": protected64, "payload": payload64,"signature": _b64(signature) + "protected": protected64, "payload": payload64, "signature": _b64(signature) + } + joseheaders = { + 'User-Agent': adtheaders.get('User-Agent'), + 'Content-Type': 'application/jose+json' } try: response = requests.post(url, json=jose, headers=joseheaders) except requests.exceptions.RequestException as error: response = error.response finally: - jws_nonce = response.headers['Replay-Nonce'] - try: - return response, response.json() - except ValueError as error: - return response, json.dumps({}) + nonce = response.headers['Replay-Nonce'] + try: + return response, response.json() + except ValueError: # if body is empty or not JSON formatted + return response, json.dumps({}) # main code - adtheaders = {'User-Agent': 'acme-dns-tiny/2.1'} - joseheaders = copy.deepcopy(adtheaders) - joseheaders['Content-Type'] = 'application/jose+json' + adtheaders = {'User-Agent': 'acme-dns-tiny/2.2'} + nonce = None log.info("Fetch informations from the ACME directory.") - directory = requests.get(acme_directory, headers=adtheaders) - acme_config = directory.json() + acme_config = requests.get(acme_directory, headers=adtheaders).json() - log.info("Parsing account key.") + log.info("Get private signature from account key.") accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"]) pub_hex, pub_exp = re.search( r"modulus:\r?\n\s+00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups() pub_exp = "{0:x}".format(int(pub_exp)) pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp - jws_header = { + # That signature is used to authenticate with the ACME server, it needs to be safely kept + private_acme_signature = { "alg": "RS256", "jwk": { "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), "kty": "RSA", "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), }, - "kid": None, } - jws_nonce = None - - log.info("Ask CA provider account url.") - account_request = {} - account_request["onlyReturnExisting"] = True - http_response, result = _send_signed_request(acme_config["newAccount"], account_request) + log.info("Ask to the ACME server the account identifier to complete the private signature.") + http_response, result = _send_signed_request(acme_config["newAccount"], + {"onlyReturnExisting": True}) if http_response.status_code == 200: - jws_header["kid"] = http_response.headers['Location'] + private_acme_signature["kid"] = http_response.headers['Location'] else: - raise ValueError("Error looking or account URL: {0} {1}".format(http_response.status_code, result)) + raise ValueError("Error looking or account URL: {0} {1}" + .format(http_response.status_code, result)) - log.info("Deactivating account...") - http_response, result = _send_signed_request(jws_header["kid"], {"status": "deactivated"}) + log.info("Deactivating the account.") + http_response, result = _send_signed_request(private_acme_signature["kid"], + {"status": "deactivated"}) if http_response.status_code == 200: - log.info("Account key deactivated !") + log.info("The account has been deactivated.") else: - raise ValueError("Error while deactivating the account key: {0} {1}".format(http_response.status_code, result)) + raise ValueError("Error while deactivating the account key: {0} {1}" + .format(http_response.status_code, result)) + def main(argv): + """Parse arguments and deactivate account.""" parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, - description="Tiny ACME client to deactivate ACME account", + description="Tiny ACME script to deactivate an ACME account", epilog="""This script permanently *deactivates* an ACME account. -You should revoke your certificates *before* using this script, -as the server won't accept any further request with this account. +You should revoke all TLS certificates linked to the account *before* using this script, +as the server won't accept any further request when account is deactivated. It will need to access the ACME private account key, so PLEASE READ THROUGH IT! It's around 150 lines, so it won't take long. Example: deactivate account.key from staging Let's Encrypt: - python3 acme_account_deactivate.py --account-key account.key --acme-directory https://acme-staging-v02.api.letsencrypt.org/directory""" + python3 acme_account_deactivate.py --account-key account.key --acme-directory \ +https://acme-staging-v02.api.letsencrypt.org/directory""" ) - parser.add_argument("--account-key", required=True, help="path to the private account key to deactivate") - parser.add_argument("--acme-directory", required=True, help="ACME directory URL of the ACME server where to remove the key") + parser.add_argument("--account-key", required=True, + help="path to the private account key to deactivate") + parser.add_argument("--acme-directory", required=True, + help="ACME directory URL of the ACME server where to remove the key") parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors") @@ -119,5 +142,6 @@ Example: deactivate account.key from staging Let's Encrypt: LOGGER.setLevel(args.quiet or logging.INFO) account_deactivate(args.account_key, args.acme_directory, log=LOGGER) + if __name__ == "__main__": # pragma: no cover main(sys.argv[1:]) diff --git a/tools/acme_account_rollover.py b/tools/acme_account_rollover.py index 66d589a8e34f89c715458b9ec1663cd4331d2588..9c1cd9d8b3735ccfece9bbf6a3450d5bb032620b 100644 --- a/tools/acme_account_rollover.py +++ b/tools/acme_account_rollover.py @@ -1,129 +1,147 @@ #!/usr/bin/env python3 -import sys, argparse, subprocess, json, base64, binascii, re, copy, logging, requests +# pylint: disable=too-many-statements +"""Tiny script to rollover two keys for an ACME account""" +import sys +import argparse +import subprocess +import json +import base64 +import binascii +import re +import copy +import logging +import requests LOGGER = logging.getLogger("acme_account_rollover") LOGGER.addHandler(logging.StreamHandler()) + +def _b64(text): + """Encodes text as base64 as specified in ACME RFC.""" + return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") + + +def _openssl(command, options, communicate=None): + """Run openssl command line and raise IOError on non-zero return.""" + openssl = subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = openssl.communicate(communicate) + if openssl.returncode != 0: + raise IOError("OpenSSL Error: {0}".format(err)) + return out + + def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log=LOGGER): - def _b64(b): - """"Encodes string as base64 as specified in ACME RFC """ - return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=") - - def _openssl(command, options, communicate=None): - """Run openssl command line and raise IOError on non-zero return.""" - openssl = subprocess.Popen(["openssl", command] + options, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out, err = openssl.communicate(communicate) - if openssl.returncode != 0: - raise IOError("OpenSSL Error: {0}".format(err)) - return out - - def _jws_header(accountkeypath): - """Creates a JWS header according to a specific account key path.""" + """Rollover the old and new account key for an ACME account.""" + def _get_private_acme_signature(accountkeypath): + """Read the account key to get the signature to authenticate with the ACME server.""" accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"]) pub_hex, pub_exp = re.search( r"modulus:\r?\n\s+00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups() pub_exp = "{0:x}".format(int(pub_exp)) pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp - jws_header = { + return { "alg": "RS256", "jwk": { "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), "kty": "RSA", "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), }, - "kid": None } - return jws_header - def _sign_request(url, keypath, payload, is_inner = False): + def _sign_request(url, keypath, payload, is_inner=False): """Signs request with a specific right account key.""" - nonlocal jws_nonce - if payload == "": # on POST-as-GET, final payload has to be just empty string + nonlocal nonce + if payload == "": # on POST-as-GET, final payload has to be just empty string payload64 = "" else: payload64 = _b64(json.dumps(payload).encode("utf8")) if keypath == new_accountkeypath: - protected = copy.deepcopy(new_jws_header) + protected = copy.deepcopy(private_acme_new_signature) elif keypath == old_accountkeypath: - protected = copy.deepcopy(old_jws_header) + protected = copy.deepcopy(private_acme_old_signature) if is_inner or url == acme_config["newAccount"]: - del protected["kid"] + if "kid" in protected: + del protected["kid"] else: del protected["jwk"] if not is_inner: - protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce'] + protected["nonce"] = (nonce + or requests.get(acme_config["newNonce"]) + .headers['Replay-Nonce']) protected["url"] = url protected64 = _b64(json.dumps(protected).encode("utf8")) signature = _openssl("dgst", ["-sha256", "-sign", keypath], "{0}.{1}".format(protected64, payload64).encode("utf8")) - signedjws = { - "protected": protected64, "payload": payload64,"signature": _b64(signature) + return { + "protected": protected64, "payload": payload64, "signature": _b64(signature) } - return signedjws def _send_signed_request(url, keypath, payload): """Sends signed requests to ACME server.""" - nonlocal jws_nonce + nonlocal nonce jose = _sign_request(url, keypath, payload) + joseheaders = { + 'User-Agent': adtheaders.get('User-Agent'), + 'Content-Type': 'application/jose+json' + } try: response = requests.post(url, json=jose, headers=joseheaders) except requests.exceptions.RequestException as error: response = error.response finally: - jws_nonce = response.headers['Replay-Nonce'] - try: - return response, response.json() - except ValueError as error: - return response, json.dumps({}) + nonce = response.headers['Replay-Nonce'] + try: + return response, response.json() + except ValueError: # if body is empty or not JSON formatted + return response, json.dumps({}) # main code - adtheaders = {'User-Agent': 'acme-dns-tiny/2.0'} - joseheaders=copy.deepcopy(adtheaders) - joseheaders['Content-Type']='application/jose+json' + adtheaders = {'User-Agent': 'acme-dns-tiny/2.2'} + nonce = None log.info("Fetch informations from the ACME directory.") - directory = requests.get(acme_directory, headers=adtheaders) - acme_config = directory.json() - - log.info("Parsing current account key...") - old_jws_header = _jws_header(old_accountkeypath) + acme_config = requests.get(acme_directory, headers=adtheaders).json() - log.info("Parsing new account key...") - new_jws_header = _jws_header(new_accountkeypath) - del new_jws_header["kid"] + log.info("Get private signature from old account key.") + private_acme_old_signature = _get_private_acme_signature(old_accountkeypath) - jws_nonce = None + log.info("Get private signature from new account key.") + private_acme_new_signature = _get_private_acme_signature(new_accountkeypath) - log.info("Ask CA provider account url.") + log.info("Ask to the ACME server the account identifier to complete the private signature.") http_response, result = _send_signed_request(acme_config["newAccount"], old_accountkeypath, { - "onlyReturnExisting": True }) + "onlyReturnExisting": True}) if http_response.status_code == 200: - old_jws_header["kid"] = http_response.headers["Location"] - new_jws_header["kid"] = http_response.headers["Location"] + private_acme_old_signature["kid"] = http_response.headers["Location"] + private_acme_new_signature["kid"] = http_response.headers["Location"] else: - raise ValueError("Error looking or account URL: {0} {1}".format(http_response.status_code, result)) + raise ValueError("Error looking or account URL: {0} {1}" + .format(http_response.status_code, result)) - log.info("Rolls over account key...") + log.info("Rolling over account keys.") # The signature by the new key covers the account URL and the old key, # signifying a request by the new key holder to take over the account from # the old key holder. inner_payload = _sign_request(acme_config["keyChange"], new_accountkeypath, { - "account": old_jws_header["kid"], - "oldKey": old_jws_header["jwk"] }, - is_inner = True) + "account": private_acme_old_signature["kid"], + "oldKey": private_acme_old_signature["jwk"]}, is_inner=True) # The signature by the old key covers this request and its signature, and # indicates the old key holder's assent to the roll-over request. - http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath, inner_payload) + http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath, + inner_payload) if http_response.status_code != 200: - raise ValueError("Error rolling over account key: {0} {1}".format(http_response.status_code, result)) - log.info("Account keys rolled over !") + raise ValueError("Error rolling over account key: {0} {1}" + .format(http_response.status_code, result)) + log.info("Keys rolled over.") + def main(argv): + """Parse arguments and roll over the ACME account keys.""" parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description="Tiny ACME client to roll over an ACME account key with another one.", @@ -133,15 +151,21 @@ It will need to have access to the ACME private account keys, so PLEASE READ THR It's around 150 lines, so it won't take long. Example: roll over account key from account.key to newaccount.key: - python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory https://acme-staging-v02.api.letsencrypt.org/directory""") - parser.add_argument("--current", required = True, help="path to the current private account key") - parser.add_argument("--new", required = True, help="path to the newer private account key to register") - parser.add_argument("--acme-directory", required = True, help="ACME directory URL of the ACME server where to remove the key") - parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors") + python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory \ +https://acme-staging-v02.api.letsencrypt.org/directory""") + parser.add_argument("--current", required=True, + help="path to the current private account key") + parser.add_argument("--new", required=True, + help="path to the newer private account key to register") + parser.add_argument("--acme-directory", required=True, + help="ACME directory URL of the ACME server where to remove the key") + parser.add_argument("--quiet", action="store_const", const=logging.ERROR, + help="suppress output except for errors") args = parser.parse_args(argv) LOGGER.setLevel(args.quiet or logging.INFO) account_rollover(args.current, args.new, args.acme_directory, log=LOGGER) + if __name__ == "__main__": # pragma: no cover main(sys.argv[1:])