Commit 4e83caf6 authored by Adrien Dorsaz's avatar Adrien Dorsaz

Merge branch 'acme-dns-tiny-codingstyle' into 'master'

Acme dns tiny codingstyle

See merge request !9
parents 1882dd3d f3a1682d
Pipeline #140 passed with stage
in 2 minutes and 50 seconds
after_script:
- sleep 10
jessie:
image: adt-jessie_dnspython3_1.11
before_script:
- pip3 install -r tests/requirements.txt
- pip3 install --upgrade -r tests/requirements.txt
script:
- coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_delete
- coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_delete.py
- coverage html
jessie_backport:
image: adt-jessie_dnspython3_1.15-bpo
before_script:
- pip3 install -r tests/requirements.txt
- pip3 install --upgrade -r tests/requirements.txt
script:
- coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_delete
- coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_delete.py
- coverage html
#!/usr/bin/env python3
import argparse, subprocess, json, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging
import os, argparse, subprocess, json, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging
import dns.resolver, dns.tsigkeyring, dns.update
from configparser import ConfigParser
from urllib.request import urlopen
from urllib.error import HTTPError
LOGGER = logging.getLogger('acme_dns_tiny_logger')
LOGGER = logging.getLogger('acme_dns_tiny')
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
......@@ -72,27 +72,22 @@ def get_crt(config, log=LOGGER):
log.info("Prepare DNS keyring and resolver.")
keyring = dns.tsigkeyring.from_text({config["TSIGKeyring"]["KeyName"]: config["TSIGKeyring"]["KeyValue"]})
resolver = dns.resolver.Resolver(configure=False)
resolver.retry_servfail = True
nameserver = []
try:
nameserver = [ipv4_rrset.to_text() for ipv4_rrset in dns.resolver.query(config["DNS"]["Host"], rdtype="A")]
nameserver = nameserver + [ipv6_rrset.to_text() for ipv6_rrset in dns.resolver.query(config["DNS"]["Host"], rdtype="AAAA")]
except dns.exception.DNSException as e:
log.info("DNS IPv4 record not found for configured dns host.")
finally:
try:
nameserver = nameserver + [ipv6_rrset.to_text() for ipv6_rrset in dns.resolver.query(config["DNS"]["Host"], rdtype="AAAA")]
except dns.exception.DNSException as e:
log.info("DNS IPv4 and IPv6 records not found for configured dns host. Try to keep original name.")
finally:
if not nameserver:
nameserver = [config["DNS"]["Host"]]
resolver = dns.resolver.Resolver(configure=False)
log.info("A and/or AAAA DNS resources not found for configured dns host: we will use either resource found if exists or directly the DNS Host configuration.")
if not nameserver:
nameserver = [config["DNS"]["Host"]]
resolver.nameservers = nameserver
resolver.retry_servfail = True
log.info("Parsing account key looking for public key.")
accountkey = _openssl("rsa", ["-in", config["acmednstiny"]["AccountKeyFile"], "-noout", "-text"])
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
r"modulus:\r?\n\s+00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
......@@ -114,7 +109,7 @@ def get_crt(config, log=LOGGER):
common_name = re.search(r"Subject:.*? CN=([^\s,;/]+)", csr)
if common_name is not None:
domains.add(common_name.group(1))
subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \n +([^\n]+)\n", csr, re.MULTILINE | re.DOTALL)
subject_alt_names = re.search(r"X509v3 Subject Alternative Name: \r?\n +([^\r\n]+)\r?\n", csr, re.MULTILINE | re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.startswith("DNS:"):
......@@ -133,11 +128,12 @@ def get_crt(config, log=LOGGER):
reg_info["contact"].append(reg_phone)
if len(reg_info["contact"]) == 0:
del reg_info["contact"]
code, result, headers = _send_signed_request(acme_config["new-reg"], reg_info)
if code == 201:
reg_received_contact = reg_info.get("contact")
account_url = dict(headers).get("Location")
log.info("Registered! (account: '{0}')".format(account_url))
reg_received_contact = reg_info.get("contact")
elif code == 409:
account_url = dict(headers).get("Location")
log.info("Already registered! (account: '{0}')".format(account_url))
......@@ -153,8 +149,8 @@ def get_crt(config, log=LOGGER):
if current_terms is None:
current_terms = _get_url_link(headers, 'terms-of-service')
if (reg_info.get("agreement") != current_terms
or (config["acmednstiny"].get("MailContact") is not None and reg_mailto not in reg_received_contact)
or (config["acmednstiny"].get("PhoneContact") is not None and reg_phone not in reg_received_contact)):
or reg_mailto not in reg_received_contact
or reg_phone not in reg_received_contact):
reg_info["resource"] = "reg"
reg_info["agreement"] = current_terms
code, result, headers = _send_signed_request(account_url, reg_info)
......@@ -207,6 +203,7 @@ def get_crt(config, log=LOGGER):
if challenge_verified is False:
number_check_fail = number_check_fail + 1
time.sleep(2)
log.info("Ask ACME server to perform checks.")
code, result, headers = _send_signed_request(challenge["uri"], {
"resource": "challenge",
......@@ -243,7 +240,7 @@ def get_crt(config, log=LOGGER):
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
certificate = "\n".join(textwrap.wrap(base64.b64encode(result).decode("utf8"), 64))
certificate = os.linesep.join(textwrap.wrap(base64.b64encode(result).decode("utf8"), 64))
# get the parent certificate which had created this one
certificate_parent_url = _get_url_link(headers, 'up')
......@@ -251,27 +248,29 @@ def get_crt(config, log=LOGGER):
if resp.getcode() not in [200, 201]:
raise ValueError("Error getting certificate chain from {0}: {1} {2}".format(
certificate_parent_url, code, resp.read()))
certificate_parent = "\n".join(textwrap.wrap(base64.b64encode(resp.read()).decode("utf8"), 64))
intermediary_certificate = os.linesep.join(textwrap.wrap(base64.b64encode(resp.read()).decode("utf8"), 64))
chainlist = ["-----BEGIN CERTIFICATE-----{0}{1}{0}-----END CERTIFICATE-----{0}".format(
os.linesep, cert) for cert in [certificate, intermediary_certificate]]
log.info("Certificate signed and received.")
return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n-----BEGIN CERTIFICATE-----\n{1}\n-----END CERTIFICATE-----\n""".format(
certificate, certificate_parent)
return "".join(chainlist)
def main(argv):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
This script automates the process of getting a signed TLS certificate
chain from Let's Encrypt using the ACME protocol and its DNS verification.
It will need to have access to your private account key and dns server
so PLEASE READ THROUGH IT!
It's around 300 lines, so it won't take long.
description="""
This script automates the process of getting a signed TLS certificate
chain from Let's Encrypt using the ACME protocol and its DNS verification.
It will need to have access to your private account key and dns server
so PLEASE READ THROUGH IT!
It's around 300 lines, so it won't take long.
===Example Usage===
python3 acme_dns_tiny.py ./example.ini > chain.crt
See example.ini file to configure correctly this script.
===================
""")
===Example Usage===
python3 acme_dns_tiny.py ./example.ini > chain.crt
See example.ini file to configure correctly this script.
===================
"""
)
parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors")
parser.add_argument("configfile", help="path to your configuration file")
......
coverage
logassert
argparse
configparser
configparser
\ No newline at end of file
import unittest, sys, os
from subprocess import Popen, PIPE
from io import StringIO
import unittest, os
import acme_dns_tiny
from tests.config_factory import generate_acme_account_delete_config
import tools.acme_account_delete
import logassert
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
......@@ -23,14 +20,13 @@ class TestACMEAccountDelete(unittest.TestCase):
self.accountkey.close()
super(TestACMEAccountDelete, self).tearDownClass()
def setUp(self):
logassert.setup(self, 'acme_account_delete')
def test_success_account_delete(self):
""" Test success account key delete """
tools.acme_account_delete.main(["--account-key", self.accountkey.name,
"--acme-directory", ACMEDirectory])
self.assertLoggedInfo("Account key deleted !")
with self.assertLogs(level='INFO') as accountdeletelog:
tools.acme_account_delete.main(["--account-key", self.accountkey.name,
"--acme-directory", ACMEDirectory])
self.assertIn("INFO:acme_account_delete:Account key deleted !",
accountdeletelog.output)
if __name__ == "__main__":
unittest.main()
import unittest, sys, os
from subprocess import Popen, PIPE
from io import StringIO
import unittest, os
import acme_dns_tiny
from tests.config_factory import generate_acme_account_rollover_config
from tools.acme_account_delete import account_delete
import tools.acme_account_rollover
import logassert
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
......@@ -27,15 +24,14 @@ class TestACMEAccountRollover(unittest.TestCase):
self.configs[tmpfile].close()
super(TestACMEAccountRollover, self).tearDownClass()
def setUp(self):
logassert.setup(self, 'acme_account_rollover')
def test_success_account_rollover(self):
""" Test success account key rollover """
tools.acme_account_rollover.main(["--current", self.configs['oldaccountkey'].name,
"--new", self.configs['newaccountkey'].name,
"--acme-directory", ACMEDirectory])
self.assertLoggedInfo("Account keys rolled over !")
with self.assertLogs(level='INFO') as accountrolloverlog:
tools.acme_account_rollover.main(["--current", self.configs['oldaccountkey'].name,
"--new", self.configs['newaccountkey'].name,
"--acme-directory", ACMEDirectory])
self.assertIn("INFO:acme_account_rollover:Account keys rolled over !",
accountrolloverlog.output)
if __name__ == "__main__":
unittest.main()
import unittest, sys, os
from subprocess import Popen, PIPE
import unittest, sys, os, subprocess
from io import StringIO
import dns.version
import acme_dns_tiny
from tests.config_factory import generate_acme_dns_tiny_config
from tools.acme_account_delete import account_delete
import logassert
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
......@@ -13,7 +12,11 @@ class TestACMEDNSTiny(unittest.TestCase):
@classmethod
def setUpClass(self):
print("Init acme_dns_tiny with python modules:")
print(" - python: {0}".format(sys.version))
print(" - dns python: {0}".format(dns.version.version))
self.configs = generate_acme_dns_tiny_config()
sys.stdout.flush()
super(TestACMEDNSTiny, self).setUpClass()
# To clean ACME staging server and close correctly temporary files
......@@ -26,95 +29,102 @@ class TestACMEDNSTiny(unittest.TestCase):
self.configs[tmpfile].close()
super(TestACMEDNSTiny, self).tearDownClass()
def setUp(self):
logassert.setup(self, 'acme_dns_tiny_logger')
# helper function to run openssl command
def openssl(self, command, options, communicate=None):
openssl = subprocess.Popen(["openssl", command] + options,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = openssl.communicate(communicate)
if openssl.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
return out.decode("utf8")
# helper function to valid success by making assertion on returned certificate chain
def assertCertificateChain(self, certificateChain):
# Output have to contains two certiicates
certlist = certificateChain.split("-----BEGIN CERTIFICATE-----")
self.assertEqual(3, len(certlist))
self.assertEqual('', certlist[0])
self.assertIn("-----END CERTIFICATE-----{0}".format(os.linesep), certlist[1])
self.assertIn("-----END CERTIFICATE-----{0}".format(os.linesep), certlist[2])
# Use openssl to check validity of chain and simple test of readability
readablecertchain = self.openssl("x509", ["-text", "-noout"], certificateChain.encode("utf8"))
self.assertIn("Issuer", readablecertchain)
def test_success_cn(self):
""" Successfully issue a certificate via common name """
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_dns_tiny.main([self.configs['goodCName'].name])
sys.stdout.seek(0)
crt = sys.stdout.read().encode("utf8")
acme_dns_tiny.main([self.configs['goodCName'].name])
certchain = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = old_stdout
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("BEGIN", crt.decode("utf8"))
self.assertIn("Issuer", out.decode("utf8"))
self.assertCertificateChain(certchain)
def test_success_dnshost_ip(self):
""" When DNS Host is an IP, DNS resolution have to fail without error """
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_dns_tiny.main([self.configs['dnsHostIP'].name])
self.assertLoggedInfo("DNS IPv4 record not found for configured dns host.")
self.assertLoggedInfo("DNS IPv4 and IPv6 records not found for configured dns host.")
sys.stdout.seek(0)
crt = sys.stdout.read().encode("utf8")
with self.assertLogs(level='INFO') as adnslog:
acme_dns_tiny.main([self.configs['dnsHostIP'].name])
self.assertIn("INFO:acme_dns_tiny:A and/or AAAA DNS resources not found for configured dns host: we will use either resource found if exists or directly the DNS Host configuration.",
adnslog.output)
certchain = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = old_stdout
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("BEGIN", crt.decode("utf8"))
self.assertIn("Issuer", out.decode("utf8"))
self.assertCertificateChain(certchain)
def test_success_san(self):
""" Successfully issue a certificate via subject alt name """
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_dns_tiny.main([self.configs['goodSAN'].name])
sys.stdout.seek(0)
crt = sys.stdout.read().encode("utf8")
acme_dns_tiny.main([self.configs['goodSAN'].name])
certchain = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = old_stdout
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("BEGIN", crt.decode("utf8"))
self.assertIn("Issuer", out.decode("utf8"))
self.assertCertificateChain(certchain)
def test_success_cli(self):
""" Successfully issue a certificate via command line interface """
crt, err = Popen([
certout, err = subprocess.Popen([
"python3", "acme_dns_tiny.py", self.configs['goodCName'].name
], stdout=PIPE, stderr=PIPE).communicate()
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("BEGIN", crt.decode("utf8"))
self.assertIn("Issuer", out.decode("utf8"))
], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
certchain = certout.decode("utf8")
self.assertCertificateChain(certchain)
def test_weak_key(self):
""" Let's Encrypt rejects weak keys """
try:
result = acme_dns_tiny.main([self.configs['weakKey'].name])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Key too small", result.args[0])
self.assertRaisesRegex(ValueError,
"key too small",
acme_dns_tiny.main, [self.configs['weakKey'].name])
def test_account_key_domain(self):
""" Can't use the account key for the CSR """
try:
result = acme_dns_tiny.main([self.configs['accountAsDomain'].name])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Certificate public key must be different than account key", result.args[0])
self.assertRaisesRegex(ValueError,
"certificate public key must be different than account key",
acme_dns_tiny.main, [self.configs['accountAsDomain'].name])
def test_failure_dns_update_tsigkeyname(self):
""" Fail to update DNS records by invalid TSIG Key name """
try:
result = acme_dns_tiny.main([self.configs['invalidTSIGName'].name])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Error updating DNS", result.args[0])
self.assertRaisesRegex(ValueError,
"Error updating DNS",
acme_dns_tiny.main, [self.configs['invalidTSIGName'].name])
def test_failure_notcompleted_configuration(self):
""" Configuration file have to be completed """
try:
result = acme_dns_tiny.main([self.configs['missingDNS'].name])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Some required settings are missing.", result.args[0])
self.assertRaisesRegex(ValueError,
"Some required settings are missing\.",
acme_dns_tiny.main, [self.configs['missingDNS'].name])
if __name__ == "__main__":
unittest.main()
import argparse, subprocess, json, base64, binascii, re, copy, logging
import os, argparse, subprocess, json, base64, binascii, re, copy, logging
from urllib.request import urlopen
from urllib.error import HTTPError
......@@ -45,7 +45,7 @@ def account_delete(accountkeypath, acme_directory, log=LOGGER):
log.info("Parsing account key...")
accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
r"modulus:\r?\n\s+00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
......
import argparse, subprocess, os, json, base64, binascii, hashlib, re, copy, logging
import os, argparse, subprocess, os, json, base64, binascii, hashlib, re, copy, logging
from urllib.request import urlopen
from urllib.error import HTTPError
......@@ -24,7 +24,7 @@ def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOG
def _jws_header(accountkeypath):
accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
r"modulus:\r?\n\s+00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment