Commit 1882dd3d authored by Adrien Dorsaz's avatar Adrien Dorsaz

Merge branch '3-create-a-script-to-implement-account-key-rollover' into 'master'

Resolve "Create a script to implement account key rollover"

Closes #3

See merge request !8
parents a07d43ae 1b5a6650
Pipeline #137 failed with stage
in 1 minute and 12 seconds
before_script:
- apt-get update -qy
- apt-get install -qy python3-dev python3-pip
- pip3 install -r tests/requirements.txt
jessie:
image: adt-jessie_dnspython3_1.11
before_script:
- pip3 install -r tests/requirements.txt
script:
- coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_delete
- coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_delete.py
coverage:
jessie_backport:
image: adt-jessie_dnspython3_1.15-bpo
before_script:
- pip3 install -r tests/requirements.txt
script:
- coverage run --source ./ -m unittest -v tests
- coverage report --include=acme_dns_tiny.py
- coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_delete
- coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_delete.py
from .test_module import TestModule
......@@ -15,12 +15,12 @@ TSIGKEYVALUE = os.getenv("GITLABCI_TSIGKEYVALUE")
TSIGALGORITHM = os.getenv("GITLABCI_TSIGALGORITHM")
# generate account and domain keys
def gen_config():
def generate_acme_dns_tiny_config():
# good account key
account_key = NamedTemporaryFile()
Popen(["openssl", "genrsa", "-out", account_key.name, "2048"]).wait()
# weak 1024 bit key
# weak 1024 bit account key
weak_key = NamedTemporaryFile()
Popen(["openssl", "genrsa", "-out", weak_key.name, "1024"]).wait()
......@@ -44,7 +44,7 @@ def gen_config():
account_csr = NamedTemporaryFile()
Popen(["openssl", "req", "-new", "-sha256", "-key", account_key.name,
"-subj", "/CN={0}".format(DOMAIN), "-out", account_csr.name]).wait()
# Default test configuration
config = configparser.ConfigParser()
config.read("./example.ini".format(DOMAIN))
......@@ -58,47 +58,47 @@ def gen_config():
config["DNS"]["Host"] = DNSHOST
config["DNS"]["Port"] = DNSPORT
config["DNS"]["Zone"] = DNSZONE
goodCName = NamedTemporaryFile()
config["acmednstiny"]["AccountKeyFile"] = account_key.name
config["acmednstiny"]["CSRFile"] = domain_csr.name
with open(goodCName.name, 'w') as configfile:
config.write(configfile)
dnsHostIP = NamedTemporaryFile()
config["DNS"]["Host"] = DNSHOSTIP
with open(dnsHostIP.name, 'w') as configfile:
config.write(configfile)
config["DNS"]["Host"] = DNSHOST
goodSAN = NamedTemporaryFile()
config["acmednstiny"]["AccountKeyFile"] = account_key.name
config["acmednstiny"]["CSRFile"] = san_csr.name
with open(goodSAN.name, 'w') as configfile:
config.write(configfile)
weakKey = NamedTemporaryFile()
config["acmednstiny"]["AccountKeyFile"] = weak_key.name
config["acmednstiny"]["CSRFile"] = domain_csr.name
with open(weakKey.name, 'w') as configfile:
config.write(configfile)
accountAsDomain = NamedTemporaryFile()
config["acmednstiny"]["AccountKeyFile"] = account_key.name
config["acmednstiny"]["CSRFile"] = account_csr.name
with open(accountAsDomain.name, 'w') as configfile:
config.write(configfile)
invalidTSIGName = NamedTemporaryFile()
config["TSIGKeyring"]["KeyName"] = "{0}.invalid".format(TSIGKEYNAME)
with open(invalidTSIGName.name, 'w') as configfile:
config.write(configfile)
missingDNS = NamedTemporaryFile()
config["DNS"] = {}
with open(missingDNS.name, 'w') as configfile:
config.write(configfile)
return {
# configs
"goodCName": goodCName,
......@@ -118,3 +118,23 @@ def gen_config():
"accountcsr": account_csr
}
# generate two account keys to roll over them
def generate_acme_account_rollover_config():
# Old account key
old_account_key = NamedTemporaryFile()
Popen(["openssl", "genrsa", "-out", old_account_key.name, "2048"]).wait()
# New account key
new_account_key = NamedTemporaryFile()
Popen(["openssl", "genrsa", "-out", new_account_key.name, "2048"]).wait()
return {
# keys (returned to keep files on system)
"oldaccountkey": old_account_key,
"newaccountkey": new_account_key
}
# generate an account key to delete it
def generate_acme_account_delete_config():
# account key
account_key = NamedTemporaryFile()
Popen(["openssl", "genrsa", "-out", account_key.name, "2048"]).wait()
return account_key
......@@ -2,4 +2,3 @@ coverage
logassert
argparse
configparser
dnspython>=1.15
import unittest, sys, os
from subprocess import Popen, PIPE
from io import StringIO
import acme_dns_tiny
from tests.config_factory import generate_acme_account_delete_config
import tools.acme_account_delete
import logassert
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
class TestACMEAccountDelete(unittest.TestCase):
"Tests for acme_account_delete"
@classmethod
def setUpClass(self):
self.accountkey = generate_acme_account_delete_config()
super(TestACMEAccountDelete, self).setUpClass()
# To clean ACME staging server and close correctly temporary files
@classmethod
def tearDownClass(self):
# close temp files correctly
self.accountkey.close()
super(TestACMEAccountDelete, self).tearDownClass()
def setUp(self):
logassert.setup(self, 'acme_account_delete')
def test_success_account_delete(self):
""" Test success account key delete """
tools.acme_account_delete.main(["--account-key", self.accountkey.name,
"--acme-directory", ACMEDirectory])
self.assertLoggedInfo("Account key deleted !")
if __name__ == "__main__":
unittest.main()
import unittest, sys, os
from subprocess import Popen, PIPE
from io import StringIO
import acme_dns_tiny
from tests.config_factory import generate_acme_account_rollover_config
from tools.acme_account_delete import account_delete
import tools.acme_account_rollover
import logassert
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
class TestACMEAccountRollover(unittest.TestCase):
"Tests for acme_account_rollover"
@classmethod
def setUpClass(self):
self.configs = generate_acme_account_rollover_config()
super(TestACMEAccountRollover, self).setUpClass()
# To clean ACME staging server and close correctly temporary files
@classmethod
def tearDownClass(self):
# delete account key registration at end of tests
account_delete(self.configs["newaccountkey"].name, ACMEDirectory)
# close temp files correctly
for tmpfile in self.configs:
self.configs[tmpfile].close()
super(TestACMEAccountRollover, self).tearDownClass()
def setUp(self):
logassert.setup(self, 'acme_account_rollover')
def test_success_account_rollover(self):
""" Test success account key rollover """
tools.acme_account_rollover.main(["--current", self.configs['oldaccountkey'].name,
"--new", self.configs['newaccountkey'].name,
"--acme-directory", ACMEDirectory])
self.assertLoggedInfo("Account keys rolled over !")
if __name__ == "__main__":
unittest.main()
import unittest, sys
import unittest, sys, os
from subprocess import Popen, PIPE
from io import StringIO
import acme_dns_tiny
from .config_maker import gen_config
from .acme_account_delete import delete_account
from tests.config_factory import generate_acme_dns_tiny_config
from tools.acme_account_delete import account_delete
import logassert
class TestModule(unittest.TestCase):
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
class TestACMEDNSTiny(unittest.TestCase):
"Tests for acme_dns_tiny.get_crt()"
@classmethod
def setUpClass(self):
self.configs = gen_config()
super(TestModule, self).setUpClass()
self.configs = generate_acme_dns_tiny_config()
super(TestACMEDNSTiny, self).setUpClass()
# To clean ACME staging server and close correctly temporary files
@classmethod
def tearDownClass(self):
# delete account key registration at end of tests
delete_account(self.configs["accountkey"].name)
account_delete(self.configs["accountkey"].name, ACMEDirectory)
# close temp files correctly
for tmpfile in self.configs:
self.configs[tmpfile].close()
super(TestModule, self).tearDownClass()
super(TestACMEDNSTiny, self).tearDownClass()
def setUp(self):
logassert.setup(self, 'acme_dns_tiny_logger')
......@@ -39,7 +41,7 @@ class TestModule(unittest.TestCase):
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("BEGIN", crt.decode("utf8"))
self.assertIn("Issuer", out.decode("utf8"))
def test_success_dnshost_ip(self):
""" When DNS Host is an IP, DNS resolution have to fail without error """
old_stdout = sys.stdout
......
import subprocess, os, json, base64, binascii, re, copy, logging
import argparse, subprocess, json, base64, binascii, re, copy, logging
from urllib.request import urlopen
from urllib.error import HTTPError
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
LOGGER = logging.getLogger(__name__)
LOGGER = logging.getLogger("acme_account_delete")
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
def delete_account(accountkeypath, log=LOGGER):
def account_delete(accountkeypath, acme_directory, log=LOGGER):
# helper function base64 encode as defined in acme spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
......@@ -26,13 +24,13 @@ def delete_account(accountkeypath, log=LOGGER):
def _send_signed_request(url, payload):
nonlocal jws_nonce
payload64 = _b64(json.dumps(payload).encode("utf8"))
protected = copy.deepcopy(header)
protected["nonce"] = jws_nonce or urlopen(ACMEDirectory).getheader("Replay-Nonce", None)
protected = copy.deepcopy(jws_header)
protected["nonce"] = jws_nonce or urlopen(acme_directory).getheader("Replay-Nonce", None)
protected64 = _b64(json.dumps(protected).encode("utf8"))
signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
"{0}.{1}".format(protected64, payload64).encode("utf8"))
data = json.dumps({
"header": header, "protected": protected64,
"header": jws_header, "protected": protected64,
"payload": payload64, "signature": _b64(signature),
})
try:
......@@ -51,7 +49,7 @@ def delete_account(accountkeypath, log=LOGGER):
accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
header = {
jws_header = {
"alg": "RS256",
"jwk": {
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
......@@ -61,7 +59,7 @@ def delete_account(accountkeypath, log=LOGGER):
}
# get ACME server configuration from the directory
directory = urlopen(ACMEDirectory)
directory = urlopen(acme_directory)
acme_config = json.loads(directory.read().decode("utf8"))
jws_nonce = None
......@@ -86,3 +84,31 @@ def delete_account(accountkeypath, log=LOGGER):
if code not in [200,202]:
raise ValueError("Error deleting account key: {0} {1}".format(code, result))
log.info("Account key deleted !")
def main(argv):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""
This script *deletes* your account from an ACME server.
It will need to have access to your private account key, so
PLEASE READ THROUGH IT!
It's around 150 lines, so it won't take long.
=== Example Usage ===
Remove account.key from staging Let's Encrypt:
python3 acme_account_delete.py --account-key account.key --acme-directory https://acme-staging.api.letsencrypt.org/directory
"""
)
parser.add_argument("--account-key", required = True, help="path to the private account key to delete")
parser.add_argument("--acme-directory", required = True, help="ACME directory URL of the ACME server where to remove the key")
parser.add_argument("--quiet", action="store_const",
const=logging.ERROR,
help="suppress output except for errors")
args = parser.parse_args(argv)
LOGGER.setLevel(args.quiet or LOGGER.level)
account_delete(args.account_key, args.acme_directory)
if __name__ == "__main__": # pragma: no cover
main(sys.argv[1:])
import argparse, subprocess, os, json, base64, binascii, hashlib, re, copy, logging
from urllib.request import urlopen
from urllib.error import HTTPError
LOGGER = logging.getLogger("acme_account_rollover")
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOGGER):
# helper function base64 encode as defined in acme spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
# helper function to run openssl command
def _openssl(command, options, communicate=None):
openssl = subprocess.Popen(["openssl", command] + options,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = openssl.communicate(communicate)
if openssl.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
return out
# helper function to get jws_header from account key path
def _jws_header(accountkeypath):
accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
jws_header = {
"alg": "RS256",
"jwk": {
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
"kty": "RSA",
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
},
}
return jws_header
# helper function to sign request with specified key
def _sign_request(accountkeypath, jwsheader, payload):
nonlocal jws_nonce
payload64 = _b64(json.dumps(payload).encode("utf8"))
protected = copy.deepcopy(jwsheader)
protected["nonce"] = jws_nonce or urlopen(acme_directory).getheader("Replay-Nonce", None)
protected64 = _b64(json.dumps(protected).encode("utf8"))
signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
"{0}.{1}".format(protected64, payload64).encode("utf8"))
signedjws = {
"header": jwsheader, "protected": protected64,
"payload": payload64, "signature": _b64(signature),
}
return signedjws
# helper function make signed requests
def _send_signed_request(accountkeypath, jwsheader, url, payload):
data = json.dumps(_sign_request(accountkeypath, jwsheader, payload))
try:
resp = urlopen(url, data.encode("utf8"))
except HTTPError as httperror:
resp = httperror
finally:
jws_nonce = resp.getheader("Replay-Nonce", None)
return resp.getcode(), resp.read(), resp.getheaders()
log.info("Parsing current account key...")
jws_header = _jws_header(accountkeypath)
log.info("Parsing new account key...")
new_jws_header = _jws_header(new_accountkeypath)
# get ACME server configuration from the directory
directory = urlopen(acme_directory)
acme_config = json.loads(directory.read().decode("utf8"))
jws_nonce = None
log.info("Register account to get account URL.")
code, result, headers = _send_signed_request(accountkeypath, jws_header, acme_config["new-reg"], {
"resource": "new-reg"
})
if code not in [201, 409]:
raise ValueError("Error getting account URL: {0} {1}".format(code,result))
account_url = dict(headers).get("Location")
log.info("Rolls over account key...")
outer_payload = _sign_request(new_accountkeypath, new_jws_header, {
"url": acme_config["key-change"], # currently needed by boulder implementation in inner payload
"account": account_url,
"newKey": new_jws_header["jwk"]})
outer_payload["resource"] = "key-change" # currently needed by boulder implementation
code, result, headers = _send_signed_request(accountkeypath, jws_header, acme_config["key-change"], outer_payload)
if code != 200:
raise ValueError("Error rolling over account key: {0} {1}".format(code, result))
log.info("Account keys rolled over !")
def main(argv):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""
This script *rolls over* your account key on an ACME server.
It will need to have access to your private account key, so
PLEASE READ THROUGH IT!
It's around 150 lines, so it won't take long.
=== Example Usage ===
Rollover account.keys from account.key to newaccount.key:
python3 acme_account_rollover.py --current account.key --new newaccount.key --acme-directory https://acme-staging.api.letsencrypt.org/directory"""
)
parser.add_argument("--current", required = True, help="path to the current private account key")
parser.add_argument("--new", required = True, help="path to the newer private account key to register")
parser.add_argument("--acme-directory", required = True, help="ACME directory URL of the ACME server where to remove the key")
parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors")
args = parser.parse_args(argv)
LOGGER.setLevel(args.quiet or LOGGER.level)
account_rollover(args.current, args.new, args.acme_directory)
if __name__ == "__main__": # pragma: no cover
main(sys.argv[1:])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment