Commit d87cdd84 authored by Adrien Dorsaz's avatar Adrien Dorsaz

key rollover has been completely updated (acme has changed way to rollover and...

key rollover has been completely updated (acme has changed way to rollover and we needed to clarify better the code variable names)
parent ed51983f
Pipeline #215 failed with stage
in 6 minutes and 49 seconds
......@@ -4,7 +4,7 @@ import sys, argparse, subprocess, json, base64, binascii, re, copy, logging, req
LOGGER = logging.getLogger("acme_account_rollover")
LOGGER.addHandler(logging.StreamHandler())
def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOGGER):
def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log=LOGGER):
def _b64(b):
""""Encodes string as base64 as specified in ACME RFC """
return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
......@@ -37,21 +37,28 @@ def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOG
}
return jws_header
def _sign_request(url, keypath, payload):
def _sign_request(url, keypath, payload, is_inner = False):
"""Signs request with a specific right account key."""
nonlocal jws_nonce
if payload == "": # on POST-as-GET, final payload has to be just empty string
payload64 = ""
else:
payload64 = _b64(json.dumps(payload).encode("utf8"))
if keypath == accountkeypath:
protected = copy.deepcopy(jws_header)
protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce']
elif keypath == new_accountkeypath:
if keypath == new_accountkeypath:
protected = copy.deepcopy(new_jws_header)
if (keypath == new_accountkeypath
or url == acme_config["newAccount"]):
elif keypath == old_accountkeypath:
protected = copy.deepcopy(old_jws_header)
if is_inner or url == acme_config["newAccount"]:
del protected["kid"]
else:
del protected["jwk"]
if not is_inner:
protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce']
protected["url"] = url
log.info("is_inner: {0}".format(is_inner))
log.info("Protected header created: {0}".format(protected))
protected64 = _b64(json.dumps(protected).encode("utf8"))
signature = _openssl("dgst", ["-sha256", "-sign", keypath],
"{0}.{1}".format(protected64, payload64).encode("utf8"))
......@@ -63,17 +70,17 @@ def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOG
def _send_signed_request(url, keypath, payload):
"""Sends signed requests to ACME server."""
nonlocal jws_nonce
jws = _sign_request(url, keypath, payload)
jose = _sign_request(url, keypath, payload)
try:
resp = requests.post(url, json=jws, headers=joseheaders)
response = requests.post(url, json=jose, headers=joseheaders)
except requests.exceptions.RequestException as error:
resp = error.response
response = error.response
finally:
jws_nonce = resp.headers['Replay-Nonce']
if resp.text != '':
return resp.status_code, resp.json(), resp.headers
else:
return resp.status_code, json.dumps({}), resp.headers
jws_nonce = response.headers['Replay-Nonce']
try:
return response, response.json()
except ValueError as error:
return response, json.dumps({})
# main code
adtheaders = {'User-Agent': 'acme-dns-tiny/2.0'}
......@@ -85,29 +92,37 @@ def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOG
acme_config = directory.json()
log.info("Parsing current account key...")
jws_header = _jws_header(accountkeypath)
old_jws_header = _jws_header(old_accountkeypath)
log.info("Parsing new account key...")
new_jws_header = _jws_header(new_accountkeypath)
del new_jws_header["kid"]
jws_nonce = None
log.info("Ask CA provider account url.")
code, result, headers = _send_signed_request(acme_config["newAccount"], accountkeypath, {
http_response, result = _send_signed_request(acme_config["newAccount"], old_accountkeypath, {
"onlyReturnExisting": True })
if code == 200:
jws_header["kid"] = headers["Location"]
if http_response.status_code == 200:
old_jws_header["kid"] = http_response.headers["Location"]
new_jws_header["kid"] = http_response.headers["Location"]
else:
raise ValueError("Error looking or account URL: {0} {1}".format(code, result))
raise ValueError("Error looking or account URL: {0} {1}".format(http_response.status_code, result))
log.info("Rolls over account key...")
outer_payload = _sign_request(jws_header["kid"], new_accountkeypath, {
"account": jws_header["kid"],
"newKey": new_jws_header["jwk"] })
code, result, headers = _send_signed_request(jws_header["kid"], accountkeypath, outer_payload)
if code != 200:
raise ValueError("Error rolling over account key: {0} {1}".format(code, result))
# The signature by the new key covers the account URL and the old key,
# signifying a request by the new key holder to take over the account from
# the old key holder.
inner_payload = _sign_request(acme_config["keyChange"], new_accountkeypath, {
"account": old_jws_header["kid"],
"oldKey": old_jws_header["jwk"] },
is_inner = True)
# The signature by the old key covers this request and its signature, and
# indicates the old key holder's assent to the roll-over request.
http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath, inner_payload)
if http_response.status_code != 200:
raise ValueError("Error rolling over account key: {0} {1}".format(http_response.status_code, result))
log.info("Account keys rolled over !")
def main(argv):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment