Commit bdc36eb8 authored by Adrien Dorsaz's avatar Adrien Dorsaz

Merge branch 'acmedirectory_rebase' into 'master'

Use of ACME Directory and automatic update agreement to terms of service

* Update main script to:
  * Use the ACME directory to retrieve correct URLs from the ACME servers
  * Be able to update account informations (contact information and agreed terms of service)
  * Read terms of service links from either the ACME directory or account informations
  * Automatically update terms of service agreement with latest changes (user have to stay informed about the updates himself)
  * Fix HTTP errors handling
* Update configuration to:
  *  Configure ACME directory instead of CAUrl
  *  Allow to specify contact informations (email and phone number ; beware that Let's Encrypt servers don't allow phones)
* Update tests to:
  * Correctly setup before running tests without using global variable
  * Correctly clean setup after running tests (correctly remove ACME account and close temporary files)
  * Update requirements to latest dnspython release (as release 1.15 has fixed the dns updates issue)
* Update account delete script (you can find it in /tests/) according to updates of the main script

See merge request !5
parents 4f2dfcb7 a604a6d5
Pipeline #94 passed with stage
in 1 minute and 36 seconds
......@@ -5,5 +5,5 @@ before_script:
coverage:
script:
- coverage run --source ./ -m unittest tests
- coverage run --source ./ -m unittest -v tests
- coverage report --include=acme_dns_tiny.py
......@@ -9,10 +9,15 @@ validation.
Since it has to have access to your private ACME account key and the
rights to update the DNS records of your DNS server, this code has been designed
to be as tiny as possible (currently less than 250 lines).
to be as tiny as possible (currently less than 300 lines).
The only prerequisites are Python 3, OpenSSL and the dnspython module (with
release before 1.14.0 (this release have a bug with dynamic DNS updates)).
The only prerequisites are Python 3, OpenSSL and the dnspython module.
For the dnspython module, be aware that it won't work with release 1.14.0,
because this one have a bug with dynamic DNS updates.
You should either use an older version from dnspython3 module (python3 specific
code) or any release of dnspython module (pyhton2 and python3 merged code) since
1.15.0.
**PLEASE READ THE SOURCE CODE! YOU MUST TRUST IT!
IT HANDLES YOUR ACCOUNT PRIVATE KEYS!**
......
......@@ -3,15 +3,16 @@ import argparse, subprocess, json, sys, base64, binascii, time, hashlib, re, cop
import dns.resolver, dns.tsigkeyring, dns.update
from configparser import ConfigParser
from urllib.request import urlopen
from urllib.error import HTTPError
LOGGER = logging.getLogger('acme_dns_tiny_logger')
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
def get_crt(config, log=LOGGER):
# helper function base64 encode for jose spec
# helper function base64 encode as defined in acme spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode("utf8").replace("=", "")
return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
# helper function to run openssl command
def _openssl(command, options, communicate=None):
......@@ -34,26 +35,39 @@ def get_crt(config, log=LOGGER):
dns_update = None
return resp
# helper function make signed requests
# helper function to send signed requests
def _send_signed_request(url, payload):
payload64 = _b64(json.dumps(payload).encode("utf8"))
protected = copy.deepcopy(header)
protected["nonce"] = urlopen(config["acmednstiny"]["CAUrl"] + "/directory").headers["Replay-Nonce"]
protected = copy.deepcopy(jws_header)
protected["nonce"] = urlopen(config["acmednstiny"]["ACMEDirectory"]).headers["Replay-Nonce"]
protected64 = _b64(json.dumps(protected).encode("utf8"))
signature = _openssl("dgst", ["-sha256", "-sign", config["acmednstiny"]["AccountKeyFile"]],
"{0}.{1}".format(protected64, payload64).encode("utf8"))
data = json.dumps({
"header": header, "protected": protected64,
"header": jws_header, "protected": protected64,
"payload": payload64, "signature": _b64(signature),
})
try:
resp = urlopen(url, data.encode("utf8"))
return resp.getcode(), resp.read(), resp.getheaders()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)(), None
except HTTPError as httperror:
return httperror.getcode(), httperror.read(), httperror.getheaders()
# create DNS keyring and resolver
log.info("Prepare DNS tools...")
# helper function to get url from Link HTTP headers
def _get_url_link(headers, rel):
log.info("Looking for Link with rel='{0}' in headers".format(rel))
linkheaders = [link.strip() for link in dict(headers)["Link"].split(',')]
url = [re.match(r'<(?P<url>.*)>.*;rel=(' + re.escape(rel) + r'|("([a-z][a-z0-9\.\-]*\s+)*' + re.escape(rel) + r'[\s"]))', link).groupdict()
for link in linkheaders][0]["url"]
return url
# main code
log.info("Read ACME directory.")
directory = urlopen(config["acmednstiny"]["ACMEDirectory"])
acme_config = json.loads(directory.read().decode("utf8"))
current_terms = acme_config.get("meta", {}).get("terms-of-service")
log.info("Prepare DNS keyring and resolver.")
keyring = dns.tsigkeyring.from_text({config["TSIGKeyring"]["KeyName"]: config["TSIGKeyring"]["KeyValue"]})
nameserver = []
try:
......@@ -72,15 +86,14 @@ def get_crt(config, log=LOGGER):
resolver.nameservers = nameserver
resolver.retry_servfail = True
# parse account key to get public key
log.info("Parsing account key...")
log.info("Parsing account key looking for public key.")
accountkey = _openssl("rsa", ["-in", config["acmednstiny"]["AccountKeyFile"], "-noout", "-text"])
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
header = {
jws_header = {
"alg": "RS256",
"jwk": {
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
......@@ -88,11 +101,10 @@ def get_crt(config, log=LOGGER):
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
},
}
accountkey_json = json.dumps(header["jwk"], sort_keys=True, separators=(",", ":"))
accountkey_json = json.dumps(jws_header["jwk"], sort_keys=True, separators=(",", ":"))
thumbprint = _b64(hashlib.sha256(accountkey_json.encode("utf8")).digest())
# find domains
log.info("Parsing CSR...")
log.info("Parsing CSR looking for domains.")
csr = _openssl("req", ["-in", config["acmednstiny"]["CSRFile"], "-noout", "-text"]).decode("utf8")
domains = set([])
common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", csr)
......@@ -104,33 +116,62 @@ def get_crt(config, log=LOGGER):
if san.startswith("DNS:"):
domains.add(san[4:])
# get the certificate domains and expiration
log.info("Registering account...")
code, result, headers = _send_signed_request(config["acmednstiny"]["CAUrl"] + "/acme/new-reg", {
"resource": "new-reg",
"agreement": "https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf",
})
log.info("Registering ACME Account.")
reg_info = {"resource": "new-reg"}
if current_terms is not None:
reg_info["agreement"] = current_terms
reg_info["contact"] = []
reg_mailto = "mailto:{0}".format(config["acmednstiny"].get("MailContact"))
reg_phone = "tel:{0}".format(config["acmednstiny"].get("PhoneContact"))
if config["acmednstiny"].get("MailContact") is not None:
reg_info["contact"].append(reg_mailto)
if config["acmednstiny"].get("PhoneContact") is not None:
reg_info["contact"].append(reg_phone)
if len(reg_info["contact"]) == 0:
del reg_info["contact"]
code, result, headers = _send_signed_request(acme_config["new-reg"], reg_info)
if code == 201:
log.info("Registered!")
reg_received_contact = reg_info.get("contact")
account_url = dict(headers).get("Location")
log.info("Registered! (account: '{0}')".format(account_url))
elif code == 409:
log.info("Already registered!")
account_url = dict(headers).get("Location")
log.info("Already registered! (account: '{0}')".format(account_url))
# Client should send empty payload to query account information
code, result, headers = _send_signed_request(account_url, {"resource":"reg"})
account_info = json.loads(result.decode("utf8"))
reg_info["agreement"] = account_info.get("agreement")
reg_received_contact = account_info.get("contact")
else:
raise ValueError("Error registering: {0} {1}".format(code, result))
log.info("Update contact information and terms of service agreement if needed.")
if current_terms is None:
current_terms = _get_url_link(headers, 'terms-of-service')
if (reg_info.get("agreement") != current_terms
or (config["acmednstiny"].get("MailContact") is not None and reg_mailto not in reg_received_contact)
or (config["acmednstiny"].get("PhoneContact") is not None and reg_phone not in reg_received_contact)):
reg_info["resource"] = "reg"
reg_info["agreement"] = current_terms
code, result, headers = _send_signed_request(account_url, reg_info)
if code == 202:
log.info("Account updated (terms of service agreed: '{0}')".format(reg_info.get("agreement")))
else:
raise ValueError("Error register update: {0} {1}".format(code, result))
# verify each domain
for domain in domains:
log.info("Verifying {0}...".format(domain))
log.info("Verifying domain: {0}".format(domain))
# get new challenge
code, result, headers = _send_signed_request(config["acmednstiny"]["CAUrl"] + "/acme/new-authz", {
code, result, headers = _send_signed_request(acme_config["new-authz"], {
"resource": "new-authz",
"identifier": {"type": "dns", "value": domain},
})
if code != 201:
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
# make and install DNS resource record
log.info("Create DNS RR...")
log.info("Create and install DNS TXT challenge resource.")
challenge = [c for c in json.loads(result.decode("utf8"))["challenges"] if c["type"] == "dns-01"][0]
token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge["token"])
keyauthorization = "{0}.{1}".format(token, thumbprint)
......@@ -140,16 +181,15 @@ def get_crt(config, log=LOGGER):
try:
_update_dns(dnsrr_set, "add")
except dns.exception.DNSException as dnsexception:
raise ValueError("Error updating DNS: {0} : {1}".format(type(dnsexception).__name__, str(dnsexception)))
raise ValueError("Error updating DNS records: {0} : {1}".format(type(dnsexception).__name__, str(dnsexception)))
# notify challenge are met
log.info("Wait {0} then start self challenge checks.".format(config["acmednstiny"].getint("CheckChallengeDelay")))
time.sleep(config["acmednstiny"].getint("CheckChallengeDelay"))
log.info("Self challenge check...")
challenge_verified = False
number_check_fail = 0
number_check_fail = 1
while challenge_verified is False:
try:
log.info('Try {0}: Check ressource with value "{1}" exits on nameservers: {2}'.format(number_check_fail+1, keydigest64, resolver.nameservers))
log.info('Try {0}: Check ressource with value "{1}" exits on nameservers: {2}'.format(number_check_fail, keydigest64, resolver.nameservers))
challenges = resolver.query(dnsrr_domain, rdtype="TXT")
for response in challenges.rrset:
log.info(".. Found value {0}".format(response.to_text()))
......@@ -157,13 +197,13 @@ def get_crt(config, log=LOGGER):
except dns.exception.DNSException as dnsexception:
log.info("Info: retry, because a DNS error occurred while checking challenge: {0} : {1}".format(type(dnsexception).__name__, dnsexception))
finally:
if number_check_fail > 10:
if number_check_fail >= 10:
raise ValueError("Error checking challenge, value not found: {0}".format(keydigest64))
if challenge_verified is False:
number_check_fail = number_check_fail + 1
time.sleep(2)
log.info("Ask CA server to perform check...")
log.info("Ask ACME server to perform checks.")
code, result, headers = _send_signed_request(challenge["uri"], {
"resource": "challenge",
"keyAuthorization": keyauthorization,
......@@ -171,7 +211,7 @@ def get_crt(config, log=LOGGER):
if code != 202:
raise ValueError("Error triggering challenge: {0} {1}".format(code, result))
# wait for challenge to be verified
log.info("Waiting challenge to be verified.")
try:
while True:
try:
......@@ -183,7 +223,7 @@ def get_crt(config, log=LOGGER):
if challenge_status["status"] == "pending":
time.sleep(2)
elif challenge_status["status"] == "valid":
log.info("{0} verified!".format(domain))
log.info("Domain {0} verified!".format(domain))
break
else:
raise ValueError("{0} challenge did not pass: {1}".format(
......@@ -191,10 +231,9 @@ def get_crt(config, log=LOGGER):
finally:
_update_dns(dnsrr_set, "delete")
# get the new certificate
log.info("Signing certificate...")
log.info("Ask to sign certificate.")
csr_der = _openssl("req", ["-in", config["acmednstiny"]["CSRFile"], "-outform", "DER"])
code, result, headers = _send_signed_request(config["acmednstiny"]["CAUrl"] + "/acme/new-cert", {
code, result, headers = _send_signed_request(acme_config["new-cert"], {
"resource": "new-cert",
"csr": _b64(csr_der),
})
......@@ -203,19 +242,14 @@ def get_crt(config, log=LOGGER):
certificate = "\n".join(textwrap.wrap(base64.b64encode(result).decode("utf8"), 64))
# get the parent certificate which had created this one
linkheader = [link.strip() for link in dict(headers)["Link"].split(',')]
certificate_parent_url = [re.match(r'<(?P<url>.*)>.*;rel=(up|("([a-z][a-z0-9\.\-]*\s+)*up[\s"]))', link).groupdict()
for link in linkheader][0]["url"]
certificate_parent_url = _get_url_link(headers, 'up')
resp = urlopen(certificate_parent_url)
code = resp.getcode()
result = resp.read()
if code not in [200, 201]:
if resp.getcode() not in [200, 201]:
raise ValueError("Error getting certificate chain from {0}: {1} {2}".format(
certificate_parent_url, code, result))
certificate_parent = "\n".join(textwrap.wrap(base64.b64encode(result).decode("utf8"), 64))
certificate_parent_url, code, resp.read()))
certificate_parent = "\n".join(textwrap.wrap(base64.b64encode(resp.read()).decode("utf8"), 64))
# return signed certificate!
log.info("Certificate signed!")
log.info("Certificate signed and received.")
return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n-----BEGIN CERTIFICATE-----\n{1}\n-----END CERTIFICATE-----\n""".format(
certificate, certificate_parent)
......@@ -227,7 +261,7 @@ def main(argv):
chain from Let's Encrypt using the ACME protocol and its DNS verification.
It will need to have access to your private account key and dns server
so PLEASE READ THROUGH IT!
It's only ~250 lines, so it won't take long.
It's around 300 lines, so it won't take long.
===Example Usage===
python3 acme_dns_tiny.py ./example.ini > chain.crt
......@@ -240,12 +274,12 @@ def main(argv):
args = parser.parse_args(argv)
config = ConfigParser()
config.read_dict({"acmednstiny": {"CAUrl": "https://acme-staging.api.letsencrypt.org",
"CheckChallengeDelay": 2},
config.read_dict({"acmednstiny": {"ACMEDirectory": "https://acme-staging.api.letsencrypt.org/directory",
"CheckChallengeDelay": 2},
"DNS": {"Port": "53"}})
config.read(args.configfile)
if (set(["accountkeyfile", "csrfile", "caurl", "checkchallengedelay"]) - set(config.options("acmednstiny"))
if (set(["accountkeyfile", "csrfile", "acmedirectory", "checkchallengedelay"]) - set(config.options("acmednstiny"))
or set(["keyname", "keyvalue", "algorithm"]) - set(config.options("TSIGKeyring"))
or set(["zone", "host", "port"]) - set(config.options("DNS"))):
raise ValueError("Some required settings are missing.")
......
......@@ -2,11 +2,15 @@
# Required readable ACME account key
AccountKeyFile = account.key
# Required readable CSR file
CSRFile = ../adtdev/adtdev.adorsaz.ch.csr
# Optional CA url (default: https://acme-staging.api.letsencrypt.org)
CAUrl = https://acme-staging.api.letsencrypt.org
CSRFile = domain.csr
# Optional ACME directory url (default: https://acme-staging.api.letsencrypt.org/directory)
ACMEDirectory = https://acme-staging.api.letsencrypt.org/directory
# Optional time in seconds to wait between DNS update and challenge check (default: 3)
CheckChallengeDelay = 3
# Optional Contact info to send to the ACME provider
MailContact = mail@example.com
# Note that Let's Encrypt servers disallow use of phone numbers
PhoneContact = +11111111111
[TSIGKeyring]
# Required TSIG key name
......@@ -14,7 +18,7 @@ KeyName = host-example
# Required TSIG key value in base64
KeyValue = XXXXXXXXXXX==
# Required TSIG algorithm
Algorithm = hmac-sha256
Algorithm = hmac-sha256
[DNS]
# Required name of zone to update
......
import subprocess, os, json, base64, binascii, re, copy, logging
from urllib.request import urlopen
from urllib.error import HTTPError
CAURL = os.getenv("GITLABCI_CAURL", "https://acme-staging.api.letsencrypt.org")
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler())
......@@ -9,9 +10,9 @@ LOGGER.setLevel(logging.INFO)
def delete_account(accountkeypath, log=LOGGER):
# helper function base64 encode for jose spec
# helper function base64 encode as defined in acme spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode("utf8").replace("=", "")
return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
# helper function to run openssl command
def _openssl(command, options, communicate=None):
......@@ -26,7 +27,7 @@ def delete_account(accountkeypath, log=LOGGER):
def _send_signed_request(url, payload):
payload64 = _b64(json.dumps(payload).encode("utf8"))
protected = copy.deepcopy(header)
protected["nonce"] = urlopen(CAURL + "/directory").headers["Replay-Nonce"]
protected["nonce"] = urlopen(ACMEDirectory).headers["Replay-Nonce"]
protected64 = _b64(json.dumps(protected).encode("utf8"))
signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
"{0}.{1}".format(protected64, payload64).encode("utf8"))
......@@ -36,9 +37,9 @@ def delete_account(accountkeypath, log=LOGGER):
})
try:
resp = urlopen(url, data.encode("utf8"))
return resp.getcode(), resp.read()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
return resp.getcode(), resp.read(), resp.getheaders()
except HTTPError as httperror:
return httperror.getcode(), httperror.read(), httperror.getheaders()
# parse account key to get public key
log.info("Parsing account key...")
......@@ -57,12 +58,28 @@ def delete_account(accountkeypath, log=LOGGER):
},
}
# send request to delete account key
# get ACME server configuration from the directory
directory = urlopen(ACMEDirectory)
acme_config = json.loads(directory.read().decode("utf8"))
log.info("Register account to get account URL.")
code, result, headers = _send_signed_request(acme_config["new-reg"], {
"resource": "new-reg"
})
if code == 201:
account_url = dict(headers).get("Location")
log.info("Registered! (account: '{0}')".format(account_url))
elif code == 409:
account_url = dict(headers).get("Location")
log.info("Already registered! (account: '{0}')".format(account_url))
log.info("Delete account...")
code, result = _send_signed_request(CAURL + "/acme/new-reg", {
code, result, headers = _send_signed_request(account_url, {
"resource": "reg",
"delete": True,
})
if code != 200:
if code not in [200,202]:
raise ValueError("Error deleting account key: {0} {1}".format(code, result))
log.info("Account key deleted !")
......@@ -4,7 +4,7 @@ from subprocess import Popen
# domain with server.py running on it for testing
DOMAIN = os.getenv("GITLABCI_DOMAIN")
CAURL = os.getenv("GITLABCI_CAURL", "https://acme-staging.api.letsencrypt.org")
ACMEDIRECTORY = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
CHALLENGEDELAY = os.getenv("GITLABCI_CHALLENGEDELAY", "3")
DNSHOST = os.getenv("GITLABCI_DNSHOST")
DNSHOSTIP = os.getenv("GITLABCI_DNSHOSTIP")
......@@ -48,8 +48,10 @@ def gen_config():
# Default test configuration
config = configparser.ConfigParser()
config.read("./example.ini".format(DOMAIN))
config["acmednstiny"]["CAUrl"] = CAURL
config["acmednstiny"]["ACMEDirectory"] = ACMEDIRECTORY
config["acmednstiny"]["CheckChallengeDelay"] = CHALLENGEDELAY
config["acmednstiny"]["MailContact"] = "mail@example.com"
del config["acmednstiny"]["PhoneContact"]
config["TSIGKeyring"]["KeyName"] = TSIGKEYNAME
config["TSIGKeyring"]["KeyValue"] = TSIGKEYVALUE
config["TSIGKeyring"]["Algorithm"] = TSIGALGORITHM
......@@ -98,6 +100,7 @@ def gen_config():
config.write(configfile)
return {
# configs
"goodCName": goodCName,
"dnsHostIP": dnsHostIP,
"goodSAN": goodSAN,
......@@ -105,11 +108,13 @@ def gen_config():
"accountAsDomain": accountAsDomain,
"invalidTSIGName": invalidTSIGName,
"missingDNS": missingDNS,
"key": {"accountkey": account_key,
"weakkey": weak_key,
"domainkey": domain_key},
"csr" : {"domaincsr": domain_csr,
"sancsr": san_csr,
"accountcsr": account_csr}
# keys (returned to keep files on system)
"accountkey": account_key,
"weakkey": weak_key,
"domainkey": domain_key,
# csr (returned to keep files on system)
"domaincsr": domain_csr,
"sancsr": san_csr,
"accountcsr": account_csr
}
......@@ -2,4 +2,4 @@ coverage
logassert
argparse
configparser
dnspython3
\ No newline at end of file
dnspython>=1.15
......@@ -6,11 +6,24 @@ from .config_maker import gen_config
from .acme_account_delete import delete_account
import logassert
CONFIGS = gen_config()
class TestModule(unittest.TestCase):
"Tests for acme_dns_tiny.get_crt()"
@classmethod
def setUpClass(self):
self.configs = gen_config()
super(TestModule, self).setUpClass()
# To clean ACME staging server and close correctly temporary files
@classmethod
def tearDownClass(self):
# delete account key registration at end of tests
delete_account(self.configs["accountkey"].name)
# close temp files correctly
for tmpfile in self.configs:
self.configs[tmpfile].close()
super(TestModule, self).tearDownClass()
def setUp(self):
logassert.setup(self, 'acme_dns_tiny_logger')
......@@ -18,7 +31,7 @@ class TestModule(unittest.TestCase):
""" Successfully issue a certificate via common name """
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_dns_tiny.main([CONFIGS['goodCName'].name])
result = acme_dns_tiny.main([self.configs['goodCName'].name])
sys.stdout.seek(0)
crt = sys.stdout.read().encode("utf8")
sys.stdout = old_stdout
......@@ -31,7 +44,7 @@ class TestModule(unittest.TestCase):
""" When DNS Host is an IP, DNS resolution have to fail without error """
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_dns_tiny.main([CONFIGS['dnsHostIP'].name])
result = acme_dns_tiny.main([self.configs['dnsHostIP'].name])
self.assertLoggedInfo("DNS IPv4 record not found for configured dns host.")
self.assertLoggedInfo("DNS IPv4 and IPv6 records not found for configured dns host.")
sys.stdout.seek(0)
......@@ -46,7 +59,7 @@ class TestModule(unittest.TestCase):
""" Successfully issue a certificate via subject alt name """
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_dns_tiny.main([CONFIGS['goodSAN'].name])
result = acme_dns_tiny.main([self.configs['goodSAN'].name])
sys.stdout.seek(0)
crt = sys.stdout.read().encode("utf8")
sys.stdout = old_stdout
......@@ -58,7 +71,7 @@ class TestModule(unittest.TestCase):
def test_success_cli(self):
""" Successfully issue a certificate via command line interface """
crt, err = Popen([
"python3", "acme_dns_tiny.py", CONFIGS['goodCName'].name
"python3", "acme_dns_tiny.py", self.configs['goodCName'].name
], stdout=PIPE, stderr=PIPE).communicate()
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
......@@ -68,7 +81,7 @@ class TestModule(unittest.TestCase):
def test_weak_key(self):
""" Let's Encrypt rejects weak keys """
try:
result = acme_dns_tiny.main([CONFIGS['weakKey'].name])
result = acme_dns_tiny.main([self.configs['weakKey'].name])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
......@@ -77,7 +90,7 @@ class TestModule(unittest.TestCase):
def test_account_key_domain(self):
""" Can't use the account key for the CSR """
try:
result = acme_dns_tiny.main([CONFIGS['accountAsDomain'].name])
result = acme_dns_tiny.main([self.configs['accountAsDomain'].name])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
......@@ -86,7 +99,7 @@ class TestModule(unittest.TestCase):
def test_failure_dns_update_tsigkeyname(self):
""" Fail to update DNS records by invalid TSIG Key name """
try:
result = acme_dns_tiny.main([CONFIGS['invalidTSIGName'].name])
result = acme_dns_tiny.main([self.configs['invalidTSIGName'].name])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
......@@ -95,29 +108,11 @@ class TestModule(unittest.TestCase):
def test_failure_notcompleted_configuration(self):
""" Configuration file have to be completed """
try:
result = acme_dns_tiny.main([CONFIGS['missingDNS'].name])
result = acme_dns_tiny.main([self.configs['missingDNS'].name])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Some required settings are missing.", result.args[0])
if __name__ == "__main__":
try:
unittest.main()
finally:
# delete account key registration at end of tests
delete_account(CONFIGS["key"]["accountkey"].name)
# close temp files correctly
CONFIGS["goodCName"].close()
CONFIGS["dnsHostIP"].close()
CONFIGS["goodSAN"].close()
CONFIGS["weakKey"].close()
CONFIGS["accountAsDomain"].close()
CONFIGS["invalidTSIGName"].close()
CONFIGS["missingDNS"].close()
CONFIGS["key"]["accountkey"].close()
CONFIGS["key"]["weakkey"].close()
CONFIGS["key"]["domainkey"].close()
CONFIGS["csr"]["domaincsr"].close()
CONFIGS["csr"]["sancsr"].close()
CONFIGS["csr"]["accountcsr"].close()
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment