acme_account_deactivate.py 5.91 KB
Newer Older
1
#!/usr/bin/env python3
2 3 4 5 6 7 8 9 10 11 12
"""Tiny script to deactivate account on an ACME server."""
import sys
import argparse
import subprocess
import json
import base64
import binascii
import re
import copy
import logging
import requests
13

14
LOGGER = logging.getLogger("acme_account_deactivate")
15
LOGGER.addHandler(logging.StreamHandler())
16

17

18
def _b64(text):
19
    """Encodes text as base64 as specified in ACME RFC."""
20 21
    return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")

22

23 24 25 26 27 28 29 30 31
def _openssl(command, options, communicate=None):
    """Run openssl command line and raise IOError on non-zero return."""
    openssl = subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE,
                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = openssl.communicate(communicate)
    if openssl.returncode != 0:
        raise IOError("OpenSSL Error: {0}".format(err))
    return out

32

33
def account_deactivate(accountkeypath, acme_directory, log=LOGGER):
34
    """Deactivate an ACME account."""
35 36

    def _send_signed_request(url, payload):
37
        """Sends signed requests to ACME server."""
38
        nonlocal nonce
39
        if payload == "":  # on POST-as-GET, final payload has to be just empty string
40 41 42
            payload64 = ""
        else:
            payload64 = _b64(json.dumps(payload).encode("utf8"))
43 44
        protected = copy.deepcopy(private_acme_signature)
        protected["nonce"] = nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce']
45 46
        protected["url"] = url
        if url == acme_config["newAccount"]:
47 48
            if "kid" in protected:
                del protected["kid"]
49 50
        else:
            del protected["jwk"]
51 52 53
        protected64 = _b64(json.dumps(protected).encode("utf8"))
        signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
                             "{0}.{1}".format(protected64, payload64).encode("utf8"))
54
        jose = {
55 56 57 58 59
            "protected": protected64, "payload": payload64, "signature": _b64(signature)
        }
        joseheaders = {
            'User-Agent': adtheaders.get('User-Agent'),
            'Content-Type': 'application/jose+json'
60
        }
61
        try:
62
            response = requests.post(url, json=jose, headers=joseheaders)
63
        except requests.exceptions.RequestException as error:
64
            response = error.response
65
        finally:
66
            nonce = response.headers['Replay-Nonce']
67 68 69
        try:
            return response, response.json()
        except ValueError:  # if body is empty or not JSON formatted
70
            return response, json.dumps({})
71

72
    # main code
73
    adtheaders = {'User-Agent': 'acme-dns-tiny/2.2'}
74
    nonce = None
75 76

    log.info("Fetch informations from the ACME directory.")
77
    acme_config = requests.get(acme_directory, headers=adtheaders).json()
78

79
    log.info("Get private signature from account key.")
80 81
    accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
    pub_hex, pub_exp = re.search(
82
        r"modulus:\s+?00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
83
        accountkey.decode("utf8"), re.MULTILINE).groups()
84 85
    pub_exp = "{0:x}".format(int(pub_exp))
    pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
86 87
    # That signature is used to authenticate with the ACME server, it needs to be safely kept
    private_acme_signature = {
88 89 90 91 92 93 94
        "alg": "RS256",
        "jwk": {
            "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
            "kty": "RSA",
            "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
        },
    }
95

96 97 98
    log.info("Ask to the ACME server the account identifier to complete the private signature.")
    http_response, result = _send_signed_request(acme_config["newAccount"],
                                                 {"onlyReturnExisting": True})
99
    if http_response.status_code == 200:
100
        private_acme_signature["kid"] = http_response.headers['Location']
101
    else:
102 103
        raise ValueError("Error looking or account URL: {0} {1}"
                         .format(http_response.status_code, result))
104

105 106 107
    log.info("Deactivating the account.")
    http_response, result = _send_signed_request(private_acme_signature["kid"],
                                                 {"status": "deactivated"})
108

109
    if http_response.status_code == 200:
110
        log.info("The account has been deactivated.")
111
    else:
112 113
        raise ValueError("Error while deactivating the account key: {0} {1}"
                         .format(http_response.status_code, result))
114

115

116
def main(argv):
117
    """Parse arguments and deactivate account."""
118 119
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
120
        description="Tiny ACME script to deactivate an ACME account",
121 122
        epilog="""This script permanently *deactivates* an ACME account.

123 124
You should revoke all TLS certificates linked to the account *before* using this script,
as the server won't accept any further request when account is deactivated.
125

126
It will need to access the ACME private account key, so PLEASE READ THROUGH IT!
127
It's around 150 lines, so it won't take long.
128

129
Example: deactivate account.key from staging Let's Encrypt:
130 131
  python3 acme_account_deactivate.py --account-key account.key --acme-directory \
https://acme-staging-v02.api.letsencrypt.org/directory"""
132
    )
133 134 135 136
    parser.add_argument("--account-key", required=True,
                        help="path to the private account key to deactivate")
    parser.add_argument("--acme-directory", required=True,
                        help="ACME directory URL of the ACME server where to remove the key")
137
    parser.add_argument("--quiet", action="store_const",
138 139
                        const=logging.ERROR,
                        help="suppress output except for errors")
140 141
    args = parser.parse_args(argv)

142 143
    LOGGER.setLevel(args.quiet or logging.INFO)
    account_deactivate(args.account_key, args.acme_directory, log=LOGGER)
144

145

146 147
if __name__ == "__main__":  # pragma: no cover
    main(sys.argv[1:])