acme_account_delete.py 3.48 KB
Newer Older
1
2
import subprocess, os, json, base64, binascii, re, copy, logging
from urllib.request import urlopen
3
from urllib.error import HTTPError
4

5
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
6
7
8
9
10
11
12

LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
            
            
def delete_account(accountkeypath, log=LOGGER):
13
    # helper function base64 encode as defined in acme spec
14
    def _b64(b):
15
        return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
16
17
18
19
20
21
22
23
24
25
26
27
28
29

    # helper function to run openssl command
    def _openssl(command, options, communicate=None):
        openssl = subprocess.Popen(["openssl", command] + options,
                                   stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = openssl.communicate(communicate)
        if openssl.returncode != 0:
            raise IOError("OpenSSL Error: {0}".format(err))
        return out

    # helper function make signed requests
    def _send_signed_request(url, payload):
        payload64 = _b64(json.dumps(payload).encode("utf8"))
        protected = copy.deepcopy(header)
30
        protected["nonce"] = urlopen(ACMEDirectory).headers["Replay-Nonce"]
31
32
33
34
35
36
37
38
39
        protected64 = _b64(json.dumps(protected).encode("utf8"))
        signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
                             "{0}.{1}".format(protected64, payload64).encode("utf8"))
        data = json.dumps({
            "header": header, "protected": protected64,
            "payload": payload64, "signature": _b64(signature),
        })
        try:
            resp = urlopen(url, data.encode("utf8"))
40
41
42
            return resp.getcode(), resp.read(), resp.getheaders()
        except HTTPError as httperror:
            return httperror.getcode(), httperror.read(), httperror.getheaders()
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

    # parse account key to get public key
    log.info("Parsing account key...")
    accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
    pub_hex, pub_exp = re.search(
        r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
        accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups()
    pub_exp = "{0:x}".format(int(pub_exp))
    pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
    header = {
        "alg": "RS256",
        "jwk": {
            "e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
            "kty": "RSA",
            "n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
        },
    }
    
61
62
63
64
    # get ACME server configuration from the directory
    directory = urlopen(ACMEDirectory)
    acme_config = json.loads(directory.read().decode("utf8"))
    
65
66
67
68
69
70
71
72
73
74
75
76
    log.info("Register account to get account URL.") 
    code, result, headers = _send_signed_request(acme_config["new-reg"], {
        "resource": "new-reg"
    })

    if code == 201:
        account_url = dict(headers).get("Location")
        log.info("Registered! (account: '{0}')".format(account_url))
    elif code == 409:
        account_url = dict(headers).get("Location")
        log.info("Already registered! (account: '{0}')".format(account_url))

77
    log.info("Delete account...")
78
    code, result, headers = _send_signed_request(account_url, {
79
80
81
        "resource": "reg",
        "delete": True,
    })
82
83

    if code not in [200,202]:
84
85
        raise ValueError("Error deleting account key: {0} {1}".format(code, result))
    log.info("Account key deleted !")