Commit 66478531 authored by Adrien Dorsaz's avatar Adrien Dorsaz

Merge branch 'v2.1-acme-draft-16' into 'master'

V2.1 acme draft 16, ready order status, general rework

See merge request !13
parents 5ff96857 bff2790f
Pipeline #225 passed with stage
in 10 minutes and 3 seconds
.PHONY: requirements unit_test_acme_dns_tiny_success_san unit_test_acme_account_rollover unit_test_acme_account_deactivate
DEFAULT: requirements
unit_test_acme_dns_tiny_success_san:
python3 -m unittest tests.test_acme_dns_tiny.TestACMEDNSTiny.test_success_san
unit_test_acme_account_rollover:
python3 -m unittest tests.test_acme_account_rollover.TestACMEAccountRollover.test_success_account_rollover
unit_test_acme_account_deactivate:
python3 -m unittest tests.test_acme_account_deactivate.TestACMEAccountDeactivate.test_success_account_deactivate
unit_test_all_with_coverage:
python3-coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_deactivate
python3-coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_deactivate.py
python3-coverage html
requirements:
pip3 install --user --upgrade -r tests/requirements.txt
This diff is collapsed.
......@@ -8,7 +8,7 @@ CSRFile = domain.csr
# Optional ACME directory url
# Default: https://acme-staging-v02.api.letsencrypt.org/directory
ACMEDirectory = https://acme-staging-v02.api.letsencrypt.org/directory
#ACMEDirectory = https://acme-staging-v02.api.letsencrypt.org/directory
# Optional To be able to be reached by ACME provider (e.g. to warn about
# certificate expicration), you can provide some contact informations.
......@@ -18,12 +18,24 @@ ACMEDirectory = https://acme-staging-v02.api.letsencrypt.org/directory
# For the mailto URI, the email address part must contains only one address
# without header fields (see [RFC6068]).
# Default: none
Contacts = mailto:mail@example.com;mailto:mail2@example.org
#Contacts = mailto:mail@example.com;mailto:mail2@example.org
# Optional to give hint to the ACME server about your prefered language for errors given by their server
# See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Language for more informations
# Default: en
Language = en
#Language = en
# Optional: ask to request different format of certificate file.
# By default, acme-dns-tiny request a certificate chain with format
# "application/pem-certificate-chain"
# With this format, you can assume the first certificate block is the one for
# your domains, as the ACME RFC force the format to have this certificate first.
#
# If the ACME server support different format, you can specify it here
# (e.g. application/pkix-cert, applicaiton/pkcs7-mime)
# Note that, if the format selected doesn't provide a full chain, you should
# read logs to find the related certificates (see link header with attribute rel=up)
#CertificateFormat = application/pem-certificate-chain
[TSIGKeyring]
# Required TSIG key name
......@@ -43,11 +55,11 @@ Zone = dnszone
Host = dnsserver
# Optional port to connect on DNS server (default: 53)
Port = 53
#Port = 53
# Optional time to live (TTL) value used to add DNS entries
# For each domain registered in the CSR, at least 1 TTL is waited before certificate creation.
# If an error occurs while looking for TXT records, we wait up to 10 TTLs by domain.
# That's why the default is only of 10 seconds, to avoid having too long time to wait to receive a new certificate.
# Default: 10 seconds
TTL = 10
#TTL = 10
This diff is collapsed.
import unittest, os, time
import unittest, os, time, configparser
import acme_dns_tiny
from tests.config_factory import generate_acme_account_deactivate_config
import tools.acme_account_deactivate
......@@ -23,8 +23,14 @@ class TestACMEAccountDeactivate(unittest.TestCase):
@classmethod
def tearDownClass(self):
# Remove temporary files
os.remove(self.configs['config'])
os.remove(self.configs['key'])
parser = configparser.ConfigParser()
parser.read(self.configs['config'])
try:
os.remove(parser["acmednstiny"]["AccountKeyFile"])
os.remove(parser["acmednstiny"]["CSRFile"])
os.remove(self.configs['config'])
except:
pass
super(TestACMEAccountDeactivate, self).tearDownClass()
def test_success_account_deactivate(self):
......
import unittest, os, time
import unittest, os, time, configparser
import acme_dns_tiny
from tests.config_factory import generate_acme_account_rollover_config
from tools.acme_account_deactivate import account_deactivate
......@@ -19,10 +19,18 @@ class TestACMEAccountRollover(unittest.TestCase):
@classmethod
def tearDownClass(self):
# deactivate account key registration at end of tests
account_deactivate(self.configs["oldaccountkey"], ACMEDirectory)
# close temp files correctly
for tmpfile in self.configs:
os.remove(self.configs[tmpfile])
# (we assume the key has been roll oved)
account_deactivate(self.configs["newaccountkey"], ACMEDirectory)
# Remove temporary files
parser = configparser.ConfigParser()
parser.read(self.configs['config'])
try:
os.remove(parser["acmednstiny"]["AccountKeyFile"])
os.remove(parser["acmednstiny"]["CSRFile"])
os.remove(self.configs["newaccountkey"])
os.remove(self.configs['config'])
except:
pass
super(TestACMEAccountRollover, self).tearDownClass()
def test_success_account_rollover(self):
......
import unittest, sys, os, subprocess, time
import unittest, sys, os, subprocess, time, configparser
from io import StringIO
import dns.version
import acme_dns_tiny
......@@ -22,11 +22,19 @@ class TestACMEDNSTiny(unittest.TestCase):
# To clean ACME staging server and close correctly temporary files
@classmethod
def tearDownClass(self):
# deactivate account key registration at end of tests
account_deactivate(self.configs["accountkey"], ACMEDirectory)
# close temp files correctly
for tmpfile in self.configs:
os.remove(self.configs[tmpfile])
for conffile in self.configs:
parser = configparser.ConfigParser()
parser.read(conffile)
try:
os.remove(parser["acmednstiny"]["AccountKeyFile"])
os.remove(parser["acmednstiny"]["CSRFile"])
# for each configuraiton, deactivate the account key
if conffile != "cnameCSR":
account_deactivate(parser["acmednstiny"]["AccountKeyFile"], ACMEDirectory)
os.remove(conffile)
except:
pass
super(TestACMEDNSTiny, self).tearDownClass()
# helper function to run openssl command
......
......@@ -5,12 +5,12 @@ LOGGER = logging.getLogger("acme_account_deactivate")
LOGGER.addHandler(logging.StreamHandler())
def account_deactivate(accountkeypath, acme_directory, log=LOGGER):
# helper function base64 encode as defined in acme spec
def _b64(b):
""""Encodes string as base64 as specified in ACME RFC """
return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
# helper function to run openssl command
def _openssl(command, options, communicate=None):
"""Run openssl command line and raise IOError on non-zero return."""
openssl = subprocess.Popen(["openssl", command] + options,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = openssl.communicate(communicate)
......@@ -18,10 +18,13 @@ def account_deactivate(accountkeypath, acme_directory, log=LOGGER):
raise IOError("OpenSSL Error: {0}".format(err))
return out
# helper function to send signed requests
def _send_signed_request(url, payload):
"""Sends signed requests to ACME server."""
nonlocal jws_nonce
payload64 = _b64(json.dumps(payload).encode("utf8"))
if payload == "": # on POST-as-GET, final payload has to be just empty string
payload64 = ""
else:
payload64 = _b64(json.dumps(payload).encode("utf8"))
protected = copy.deepcopy(jws_header)
protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce']
protected["url"] = url
......@@ -32,22 +35,22 @@ def account_deactivate(accountkeypath, acme_directory, log=LOGGER):
protected64 = _b64(json.dumps(protected).encode("utf8"))
signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
"{0}.{1}".format(protected64, payload64).encode("utf8"))
jws = {
"protected": protected64, "payload": payload64, "signature": _b64(signature)
jose = {
"protected": protected64, "payload": payload64,"signature": _b64(signature)
}
try:
resp = requests.post(url, json=jws, headers=joseheaders)
response = requests.post(url, json=jose, headers=joseheaders)
except requests.exceptions.RequestException as error:
resp = error.response
response = error.response
finally:
jws_nonce = resp.headers['Replay-Nonce']
if resp.text != '':
return resp.status_code, resp.json(), resp.headers
else:
return resp.status_code, json.dumps({}), resp.headers
jws_nonce = response.headers['Replay-Nonce']
try:
return response, response.json()
except ValueError as error:
return response, json.dumps({})
# main code
adtheaders = {'User-Agent': 'acme-dns-tiny/2.0'}
adtheaders = {'User-Agent': 'acme-dns-tiny/2.1'}
joseheaders = copy.deepcopy(adtheaders)
joseheaders['Content-Type'] = 'application/jose+json'
......@@ -77,19 +80,19 @@ def account_deactivate(accountkeypath, acme_directory, log=LOGGER):
account_request = {}
account_request["onlyReturnExisting"] = True
code, result, headers = _send_signed_request(acme_config["newAccount"], account_request)
if code == 200:
jws_header["kid"] = headers['Location']
http_response, result = _send_signed_request(acme_config["newAccount"], account_request)
if http_response.status_code == 200:
jws_header["kid"] = http_response.headers['Location']
else:
raise ValueError("Error looking or account URL: {0} {1}".format(code, result))
raise ValueError("Error looking or account URL: {0} {1}".format(http_response.status_code, result))
log.info("Deactivating account...")
code, result, headers = _send_signed_request(jws_header["kid"], {"status": "deactivated"})
http_response, result = _send_signed_request(jws_header["kid"], {"status": "deactivated"})
if code == 200:
if http_response.status_code == 200:
log.info("Account key deactivated !")
else:
raise ValueError("Error while deactivating the account key: {0} {1}".format(code, result))
raise ValueError("Error while deactivating the account key: {0} {1}".format(http_response.status_code, result))
def main(argv):
parser = argparse.ArgumentParser(
......
......@@ -4,13 +4,13 @@ import sys, argparse, subprocess, json, base64, binascii, re, copy, logging, req
LOGGER = logging.getLogger("acme_account_rollover")
LOGGER.addHandler(logging.StreamHandler())
def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOGGER):
# helper function base64 encode as defined in acme spec
def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log=LOGGER):
def _b64(b):
""""Encodes string as base64 as specified in ACME RFC """
return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
# helper function to run openssl command
def _openssl(command, options, communicate=None):
"""Run openssl command line and raise IOError on non-zero return."""
openssl = subprocess.Popen(["openssl", command] + options,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = openssl.communicate(communicate)
......@@ -18,8 +18,8 @@ def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOG
raise IOError("OpenSSL Error: {0}".format(err))
return out
# helper function to get jws_header from account key path
def _jws_header(accountkeypath):
"""Creates a JWS header according to a specific account key path."""
accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
pub_hex, pub_exp = re.search(
r"modulus:\r?\n\s+00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
......@@ -37,20 +37,25 @@ def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOG
}
return jws_header
# helper function to sign request with specified key path
def _sign_request(url, keypath, payload):
def _sign_request(url, keypath, payload, is_inner = False):
"""Signs request with a specific right account key."""
nonlocal jws_nonce
payload64 = _b64(json.dumps(payload).encode("utf8"))
if keypath == accountkeypath:
protected = copy.deepcopy(jws_header)
protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce']
elif keypath == new_accountkeypath:
if payload == "": # on POST-as-GET, final payload has to be just empty string
payload64 = ""
else:
payload64 = _b64(json.dumps(payload).encode("utf8"))
if keypath == new_accountkeypath:
protected = copy.deepcopy(new_jws_header)
if (keypath == new_accountkeypath
or url == acme_config["newAccount"]):
elif keypath == old_accountkeypath:
protected = copy.deepcopy(old_jws_header)
if is_inner or url == acme_config["newAccount"]:
del protected["kid"]
else:
del protected["jwk"]
if not is_inner:
protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce']
protected["url"] = url
protected64 = _b64(json.dumps(protected).encode("utf8"))
signature = _openssl("dgst", ["-sha256", "-sign", keypath],
......@@ -60,20 +65,20 @@ def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOG
}
return signedjws
# helper function make signed requests
def _send_signed_request(url, keypath, payload):
"""Sends signed requests to ACME server."""
nonlocal jws_nonce
jws = _sign_request(url, keypath, payload)
jose = _sign_request(url, keypath, payload)
try:
resp = requests.post(url, json=jws, headers=joseheaders)
response = requests.post(url, json=jose, headers=joseheaders)
except requests.exceptions.RequestException as error:
resp = error.response
response = error.response
finally:
jws_nonce = resp.headers['Replay-Nonce']
if resp.text != '':
return resp.status_code, resp.json(), resp.headers
else:
return resp.status_code, json.dumps({}), resp.headers
jws_nonce = response.headers['Replay-Nonce']
try:
return response, response.json()
except ValueError as error:
return response, json.dumps({})
# main code
adtheaders = {'User-Agent': 'acme-dns-tiny/2.0'}
......@@ -85,29 +90,37 @@ def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOG
acme_config = directory.json()
log.info("Parsing current account key...")
jws_header = _jws_header(accountkeypath)
old_jws_header = _jws_header(old_accountkeypath)
log.info("Parsing new account key...")
new_jws_header = _jws_header(new_accountkeypath)
del new_jws_header["kid"]
jws_nonce = None
log.info("Ask CA provider account url.")
code, result, headers = _send_signed_request(acme_config["newAccount"], accountkeypath, {
http_response, result = _send_signed_request(acme_config["newAccount"], old_accountkeypath, {
"onlyReturnExisting": True })
if code == 200:
jws_header["kid"] = headers["Location"]
if http_response.status_code == 200:
old_jws_header["kid"] = http_response.headers["Location"]
new_jws_header["kid"] = http_response.headers["Location"]
else:
raise ValueError("Error looking or account URL: {0} {1}".format(code, result))
raise ValueError("Error looking or account URL: {0} {1}".format(http_response.status_code, result))
log.info("Rolls over account key...")
outer_payload = _sign_request(jws_header["kid"], new_accountkeypath, {
"account": jws_header["kid"],
"newKey": new_jws_header["jwk"] })
code, result, headers = _send_signed_request(jws_header["kid"], accountkeypath, outer_payload)
if code != 200:
raise ValueError("Error rolling over account key: {0} {1}".format(code, result))
# The signature by the new key covers the account URL and the old key,
# signifying a request by the new key holder to take over the account from
# the old key holder.
inner_payload = _sign_request(acme_config["keyChange"], new_accountkeypath, {
"account": old_jws_header["kid"],
"oldKey": old_jws_header["jwk"] },
is_inner = True)
# The signature by the old key covers this request and its signature, and
# indicates the old key holder's assent to the roll-over request.
http_response, result = _send_signed_request(acme_config["keyChange"], old_accountkeypath, inner_payload)
if http_response.status_code != 200:
raise ValueError("Error rolling over account key: {0} {1}".format(http_response.status_code, result))
log.info("Account keys rolled over !")
def main(argv):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment