acme_account_deactivate.py 5.92 KB
Newer Older
1
#!/usr/bin/env python3
2 3 4 5 6 7 8 9 10 11 12
"""Tiny script to deactivate account on an ACME server."""
import sys
import argparse
import subprocess
import json
import base64
import binascii
import re
import copy
import logging
import requests
13

14
LOGGER = logging.getLogger("acme_account_deactivate")
15
LOGGER.addHandler(logging.StreamHandler())
16

17 18 19 20 21 22 23 24 25 26 27 28 29
def _b64(text):
    """"Encodes text as base64 as specified in ACME RFC """
    return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")

def _openssl(command, options, communicate=None):
    """Run openssl command line and raise IOError on non-zero return."""
    openssl = subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE,
                               stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = openssl.communicate(communicate)
    if openssl.returncode != 0:
        raise IOError("OpenSSL Error: {0}".format(err))
    return out

30
def account_deactivate(accountkeypath, acme_directory, log=LOGGER):
31
    """Deactivate an ACME account"""
32 33

    def _send_signed_request(url, payload):
34
        """Sends signed requests to ACME server."""
35
        nonlocal nonce
36 37 38 39
        if payload == "": # on POST-as-GET, final payload has to be just empty string
            payload64 = ""
        else:
            payload64 = _b64(json.dumps(payload).encode("utf8"))
40 41
        protected = copy.deepcopy(private_acme_signature)
        protected["nonce"] = nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce']
42 43
        protected["url"] = url
        if url == acme_config["newAccount"]:
44 45
            if "kid" in protected:
                del protected["kid"]
46 47
        else:
            del protected["jwk"]
48 49 50
        protected64 = _b64(json.dumps(protected).encode("utf8"))
        signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
                             "{0}.{1}".format(protected64, payload64).encode("utf8"))
51
        jose = {
52 53 54 55 56
            "protected": protected64, "payload": payload64, "signature": _b64(signature)
        }
        joseheaders = {
            'User-Agent': adtheaders.get('User-Agent'),
            'Content-Type': 'application/jose+json'
57
        }
58
        try:
59
            response = requests.post(url, json=jose, headers=joseheaders)
60
        except requests.exceptions.RequestException as error:
61
            response = error.response
62
        finally:
63
            nonce = response.headers['Replay-Nonce']
64 65 66
        try:
            return response, response.json()
        except ValueError:  # if body is empty or not JSON formatted
67
            return response, json.dumps({})
68

69
    # main code
70
    adtheaders = {'User-Agent': 'acme-dns-tiny/2.2'}
71
    nonce = None
72 73

    log.info("Fetch informations from the ACME directory.")
74
    acme_config = requests.get(acme_directory, headers=adtheaders).json()
75

76
    log.info("Get private signature from account key.")
77 78
    accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
    pub_hex, pub_exp = re.search(
79
        r"modulus:\r?\n\s+00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
80 81 82
        accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups()
    pub_exp = "{0:x}".format(int(pub_exp))
    pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
83 84
    # That signature is used to authenticate with the ACME server, it needs to be safely kept
    private_acme_signature = {
85 86 87 88 89 90 91
        "alg": "RS256",
        "jwk": {
            "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
            "kty": "RSA",
            "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
        },
    }
92

93 94 95
    log.info("Ask to the ACME server the account identifier to complete the private signature.")
    http_response, result = _send_signed_request(acme_config["newAccount"],
                                                 {"onlyReturnExisting": True})
96
    if http_response.status_code == 200:
97
        private_acme_signature["kid"] = http_response.headers['Location']
98
    else:
99 100
        raise ValueError("Error looking or account URL: {0} {1}"
                         .format(http_response.status_code, result))
101

102 103 104
    log.info("Deactivating the account.")
    http_response, result = _send_signed_request(private_acme_signature["kid"],
                                                 {"status": "deactivated"})
105

106
    if http_response.status_code == 200:
107
        log.info("The account has been deactivated.")
108
    else:
109 110
        raise ValueError("Error while deactivating the account key: {0} {1}"
                         .format(http_response.status_code, result))
111 112

def main(argv):
113
    """Parse arguments and deactivate account"""
114 115
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
116
        description="Tiny ACME script to deactivate an ACME account",
117 118
        epilog="""This script permanently *deactivates* an ACME account.

119 120
You should revoke all TLS certificates linked to the account *before* using this script,
as the server won't accept any further request when account is deactivated.
121

122
It will need to access the ACME private account key, so PLEASE READ THROUGH IT!
123
It's around 150 lines, so it won't take long.
124

125
Example: deactivate account.key from staging Let's Encrypt:
126 127
  python3 acme_account_deactivate.py --account-key account.key --acme-directory \
https://acme-staging-v02.api.letsencrypt.org/directory"""
128
    )
129 130 131 132
    parser.add_argument("--account-key", required=True,
                        help="path to the private account key to deactivate")
    parser.add_argument("--acme-directory", required=True,
                        help="ACME directory URL of the ACME server where to remove the key")
133
    parser.add_argument("--quiet", action="store_const",
134 135
                        const=logging.ERROR,
                        help="suppress output except for errors")
136 137
    args = parser.parse_args(argv)

138 139
    LOGGER.setLevel(args.quiet or logging.INFO)
    account_deactivate(args.account_key, args.acme_directory, log=LOGGER)
140 141 142

if __name__ == "__main__":  # pragma: no cover
    main(sys.argv[1:])