acme_account_rollover.py 7.3 KB
Newer Older
1
#!/usr/bin/env python3
2
# pylint: disable=too-many-statements
3 4 5 6 7 8 9 10 11 12 13
"""Tiny script to rollover two keys for an ACME account"""
import sys
import argparse
import subprocess
import json
import base64
import binascii
import re
import copy
import logging
import requests
14 15 16 17

LOGGER = logging.getLogger("acme_account_rollover")
LOGGER.addHandler(logging.StreamHandler())

18

19
def _b64(text):
20
    """Encodes text as base64 as specified in ACME RFC."""
21 22
    return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")

23

24 25 26 27 28 29 30 31 32
def _openssl(command, options, communicate=None):
    """Run openssl command line and raise IOError on non-zero return."""
    openssl = subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE,
                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = openssl.communicate(communicate)
    if openssl.returncode != 0:
        raise IOError("OpenSSL Error: {0}".format(err))
    return out

33

34
def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log=LOGGER):
35 36 37
    """Rollover the old and new account key for an ACME account."""
    def _get_private_acme_signature(accountkeypath):
        """Read the account key to get the signature to authenticate with the ACME server."""
38 39
        accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
        pub_hex, pub_exp = re.search(
40
            r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
41
            accountkey.decode("utf8"), re.MULTILINE).groups()
42 43
        pub_exp = "{0:x}".format(int(pub_exp))
        pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
44
        return {
45 46 47 48 49 50 51 52
            "alg": "RS256",
            "jwk": {
                "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
                "kty": "RSA",
                "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
            },
        }

53
    def _sign_request(url, keypath, payload, is_inner=False):
54
        """Signs request with a specific right account key."""
55
        nonlocal nonce
56
        if payload == "":  # on POST-as-GET, final payload has to be just empty string
57 58 59 60
            payload64 = ""
        else:
            payload64 = _b64(json.dumps(payload).encode("utf8"))
        if keypath == new_accountkeypath:
61
            protected = copy.deepcopy(private_acme_new_signature)
62
        elif keypath == old_accountkeypath:
63
            protected = copy.deepcopy(private_acme_old_signature)
64 65

        if is_inner or url == acme_config["newAccount"]:
66 67
            if "kid" in protected:
                del protected["kid"]
Adrien Dorsaz's avatar
Adrien Dorsaz committed
68 69
        else:
            del protected["jwk"]
70 71

        if not is_inner:
72 73 74
            protected["nonce"] = (nonce
                                  or requests.get(acme_config["newNonce"])
                                  .headers['Replay-Nonce'])
Adrien Dorsaz's avatar
Adrien Dorsaz committed
75
        protected["url"] = url
76
        protected64 = _b64(json.dumps(protected).encode("utf8"))
Adrien Dorsaz's avatar
Adrien Dorsaz committed
77
        signature = _openssl("dgst", ["-sha256", "-sign", keypath],
78
                             "{0}.{1}".format(protected64, payload64).encode("utf8"))
79 80
        return {
            "protected": protected64, "payload": payload64, "signature": _b64(signature)
81
        }
82

Adrien Dorsaz's avatar
Adrien Dorsaz committed
83
    def _send_signed_request(url, keypath, payload):
84
        """Sends signed requests to ACME server."""
85
        nonlocal nonce
86
        jose = _sign_request(url, keypath, payload)
87 88 89 90
        joseheaders = {
            'User-Agent': adtheaders.get('User-Agent'),
            'Content-Type': 'application/jose+json'
        }
91
        try:
92
            response = requests.post(url, json=jose, headers=joseheaders)
93
        except requests.exceptions.RequestException as error:
94
            response = error.response
95
        finally:
96
            nonce = response.headers['Replay-Nonce']
97 98 99
        try:
            return response, response.json()
        except ValueError:  # if body is empty or not JSON formatted
100
            return response, json.dumps({})
101

102
    # main code
103
    adtheaders = {'User-Agent': 'acme-dns-tiny/2.2'}
104
    nonce = None
105 106

    log.info("Fetch informations from the ACME directory.")
107
    acme_config = requests.get(acme_directory, headers=adtheaders).json()
108

109 110
    log.info("Get private signature from old account key.")
    private_acme_old_signature = _get_private_acme_signature(old_accountkeypath)
111

112 113
    log.info("Get private signature from new account key.")
    private_acme_new_signature = _get_private_acme_signature(new_accountkeypath)
114

115
    log.info("Ask to the ACME server the account identifier to complete the private signature.")
116
    http_response, result = _send_signed_request(acme_config["newAccount"], old_accountkeypath, {
117
        "onlyReturnExisting": True})
118
    if http_response.status_code == 200:
119 120
        private_acme_old_signature["kid"] = http_response.headers["Location"]
        private_acme_new_signature["kid"] = http_response.headers["Location"]
Adrien Dorsaz's avatar
Adrien Dorsaz committed
121
    else:
122 123
        raise ValueError("Error looking or account URL: {0} {1}"
                         .format(http_response.status_code, result))
124

125
    log.info("Rolling over account keys.")
126 127 128 129
    # The signature by the new key covers the account URL and the old key,
    # signifying a request by the new key holder to take over the account from
    # the old key holder.
    inner_payload = _sign_request(acme_config["keyChange"], new_accountkeypath, {
130 131
        "account": private_acme_old_signature["kid"],
        "oldKey": private_acme_old_signature["jwk"]}, is_inner=True)
132 133
    # The signature by the old key covers this request and its signature, and
    # indicates the old key holder's assent to the roll-over request.
134 135
    http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath,
                                                 inner_payload)
136 137

    if http_response.status_code != 200:
138 139 140
        raise ValueError("Error rolling over account key: {0} {1}"
                         .format(http_response.status_code, result))
    log.info("Keys rolled over.")
141

142

143
def main(argv):
144
    """Parse arguments and roll over the ACME account keys."""
145 146
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
147 148
        description="Tiny ACME client to roll over an ACME account key with another one.",
        epilog="""This script *rolls over* ACME account keys.
149

150
It will need to have access to the ACME private account keys, so PLEASE READ THROUGH IT!
151
It's around 150 lines, so it won't take long.
152

153
Example: roll over account key from account.key to newaccount.key:
154 155 156 157 158 159 160 161 162 163
  python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory \
https://acme-staging-v02.api.letsencrypt.org/directory""")
    parser.add_argument("--current", required=True,
                        help="path to the current private account key")
    parser.add_argument("--new", required=True,
                        help="path to the newer private account key to register")
    parser.add_argument("--acme-directory", required=True,
                        help="ACME directory URL of the ACME server where to remove the key")
    parser.add_argument("--quiet", action="store_const", const=logging.ERROR,
                        help="suppress output except for errors")
164 165
    args = parser.parse_args(argv)

166 167
    LOGGER.setLevel(args.quiet or logging.INFO)
    account_rollover(args.current, args.new, args.acme_directory, log=LOGGER)
168

169

170 171
if __name__ == "__main__":  # pragma: no cover
    main(sys.argv[1:])