From da319857d39e010414dd55eb05304c11ecf2fe1a Mon Sep 17 00:00:00 2001 From: Adrien Dorsaz Date: Sun, 24 May 2020 08:21:35 +0200 Subject: [PATCH] account_rollover: adapt script to best practices provided by linter --- tools/acme_account_rollover.py | 140 +++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 61 deletions(-) diff --git a/tools/acme_account_rollover.py b/tools/acme_account_rollover.py index 66d589a..64ccf2e 100644 --- a/tools/acme_account_rollover.py +++ b/tools/acme_account_rollover.py @@ -1,32 +1,44 @@ #!/usr/bin/env python3 -import sys, argparse, subprocess, json, base64, binascii, re, copy, logging, requests +#pylint: disable=too-many-statements +"""Tiny script to rollover two keys for an ACME account""" +import sys +import argparse +import subprocess +import json +import base64 +import binascii +import re +import copy +import logging +import requests LOGGER = logging.getLogger("acme_account_rollover") LOGGER.addHandler(logging.StreamHandler()) +def _b64(text): + """"Encodes text as base64 as specified in ACME RFC """ + return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=") + +def _openssl(command, options, communicate=None): + """Run openssl command line and raise IOError on non-zero return.""" + openssl = subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = openssl.communicate(communicate) + if openssl.returncode != 0: + raise IOError("OpenSSL Error: {0}".format(err)) + return out + def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log=LOGGER): - def _b64(b): - """"Encodes string as base64 as specified in ACME RFC """ - return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=") - - def _openssl(command, options, communicate=None): - """Run openssl command line and raise IOError on non-zero return.""" - openssl = subprocess.Popen(["openssl", command] + options, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out, err = openssl.communicate(communicate) - if openssl.returncode != 0: - raise IOError("OpenSSL Error: {0}".format(err)) - return out - - def _jws_header(accountkeypath): - """Creates a JWS header according to a specific account key path.""" + """Rollover the old and new account key for an ACME account.""" + def _get_private_acme_signature(accountkeypath): + """Read the account key to get the signature to authenticate with the ACME server.""" accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"]) pub_hex, pub_exp = re.search( r"modulus:\r?\n\s+00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)", accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups() pub_exp = "{0:x}".format(int(pub_exp)) pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp - jws_header = { + return { "alg": "RS256", "jwk": { "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), @@ -35,19 +47,18 @@ def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log }, "kid": None } - return jws_header - def _sign_request(url, keypath, payload, is_inner = False): + def _sign_request(url, keypath, payload, is_inner=False): """Signs request with a specific right account key.""" - nonlocal jws_nonce + nonlocal nonce if payload == "": # on POST-as-GET, final payload has to be just empty string payload64 = "" else: payload64 = _b64(json.dumps(payload).encode("utf8")) if keypath == new_accountkeypath: - protected = copy.deepcopy(new_jws_header) + protected = copy.deepcopy(private_acme_new_signature) elif keypath == old_accountkeypath: - protected = copy.deepcopy(old_jws_header) + protected = copy.deepcopy(private_acme_old_signature) if is_inner or url == acme_config["newAccount"]: del protected["kid"] @@ -55,75 +66,77 @@ def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log del protected["jwk"] if not is_inner: - protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce'] + protected["nonce"] = (nonce + or requests.get(acme_config["newNonce"]) + .headers['Replay-Nonce']) protected["url"] = url protected64 = _b64(json.dumps(protected).encode("utf8")) signature = _openssl("dgst", ["-sha256", "-sign", keypath], "{0}.{1}".format(protected64, payload64).encode("utf8")) - signedjws = { - "protected": protected64, "payload": payload64,"signature": _b64(signature) + return { + "protected": protected64, "payload": payload64, "signature": _b64(signature) } - return signedjws def _send_signed_request(url, keypath, payload): """Sends signed requests to ACME server.""" - nonlocal jws_nonce + nonlocal nonce jose = _sign_request(url, keypath, payload) + joseheaders = { + 'User-Agent': adtheaders.get('User-Agent'), + 'Content-Type': 'application/jose+json' + } try: response = requests.post(url, json=jose, headers=joseheaders) except requests.exceptions.RequestException as error: response = error.response finally: - jws_nonce = response.headers['Replay-Nonce'] - try: - return response, response.json() - except ValueError as error: - return response, json.dumps({}) + nonce = response.headers['Replay-Nonce'] + if not response.text: + return response, json.dumps({}) + return response, response.json() # main code - adtheaders = {'User-Agent': 'acme-dns-tiny/2.0'} - joseheaders=copy.deepcopy(adtheaders) - joseheaders['Content-Type']='application/jose+json' + adtheaders = {'User-Agent': 'acme-dns-tiny/2.0'} + nonce = None log.info("Fetch informations from the ACME directory.") - directory = requests.get(acme_directory, headers=adtheaders) - acme_config = directory.json() - - log.info("Parsing current account key...") - old_jws_header = _jws_header(old_accountkeypath) + acme_config = requests.get(acme_directory, headers=adtheaders).json() - log.info("Parsing new account key...") - new_jws_header = _jws_header(new_accountkeypath) - del new_jws_header["kid"] + log.info("Get private signature from old account key.") + private_acme_old_signature = _get_private_acme_signature(old_accountkeypath) - jws_nonce = None + log.info("Get private signature from new account key.") + private_acme_new_signature = _get_private_acme_signature(new_accountkeypath) - log.info("Ask CA provider account url.") + log.info("Ask to the ACME server the account identifier to complete the private signature.") http_response, result = _send_signed_request(acme_config["newAccount"], old_accountkeypath, { - "onlyReturnExisting": True }) + "onlyReturnExisting": True}) if http_response.status_code == 200: - old_jws_header["kid"] = http_response.headers["Location"] - new_jws_header["kid"] = http_response.headers["Location"] + private_acme_old_signature["kid"] = http_response.headers["Location"] + private_acme_new_signature["kid"] = http_response.headers["Location"] else: - raise ValueError("Error looking or account URL: {0} {1}".format(http_response.status_code, result)) + raise ValueError("Error looking or account URL: {0} {1}" + .format(http_response.status_code, result)) - log.info("Rolls over account key...") + log.info("Rolling over account keys.") # The signature by the new key covers the account URL and the old key, # signifying a request by the new key holder to take over the account from # the old key holder. inner_payload = _sign_request(acme_config["keyChange"], new_accountkeypath, { - "account": old_jws_header["kid"], - "oldKey": old_jws_header["jwk"] }, - is_inner = True) + "account": private_acme_old_signature["kid"], + "oldKey": private_acme_old_signature["jwk"]}, is_inner=True) # The signature by the old key covers this request and its signature, and # indicates the old key holder's assent to the roll-over request. - http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath, inner_payload) + http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath, + inner_payload) if http_response.status_code != 200: - raise ValueError("Error rolling over account key: {0} {1}".format(http_response.status_code, result)) - log.info("Account keys rolled over !") + raise ValueError("Error rolling over account key: {0} {1}" + .format(http_response.status_code, result)) + log.info("Keys rolled over.") def main(argv): + """Parse arguments and roll over the ACME account keys""" parser = argparse.ArgumentParser( formatter_class=argparse.RawDescriptionHelpFormatter, description="Tiny ACME client to roll over an ACME account key with another one.", @@ -133,11 +146,16 @@ It will need to have access to the ACME private account keys, so PLEASE READ THR It's around 150 lines, so it won't take long. Example: roll over account key from account.key to newaccount.key: - python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory https://acme-staging-v02.api.letsencrypt.org/directory""") - parser.add_argument("--current", required = True, help="path to the current private account key") - parser.add_argument("--new", required = True, help="path to the newer private account key to register") - parser.add_argument("--acme-directory", required = True, help="ACME directory URL of the ACME server where to remove the key") - parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors") + python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory \ +https://acme-staging-v02.api.letsencrypt.org/directory""") + parser.add_argument("--current", required=True, + help="path to the current private account key") + parser.add_argument("--new", required=True, + help="path to the newer private account key to register") + parser.add_argument("--acme-directory", required=True, + help="ACME directory URL of the ACME server where to remove the key") + parser.add_argument("--quiet", action="store_const", const=logging.ERROR, + help="suppress output except for errors") args = parser.parse_args(argv) LOGGER.setLevel(args.quiet or logging.INFO) -- GitLab