Commit 436d055b authored by Adrien Dorsaz's avatar Adrien Dorsaz

apply pep8 hints and use brackets to split long lines

parent 8d346a56
#!/usr/bin/env python3
#pylint: disable=multiple-imports
# pylint: disable=multiple-imports
"""ACME client to met DNS challenge and receive TLS certificate"""
import argparse, base64, binascii, configparser, copy, hashlib, json, logging
import re, sys, subprocess, time
......@@ -8,10 +8,12 @@ import requests, dns.resolver, dns.tsigkeyring, dns.update
LOGGER = logging.getLogger('acme_dns_tiny')
LOGGER.addHandler(logging.StreamHandler())
def _base64(text):
""""Encodes string as base64 as specified in the ACME RFC."""
"""Encodes string as base64 as specified in the ACME RFC."""
return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")
def _openssl(command, options, communicate=None):
"""Run openssl command line and raise IOError on non-zero return."""
openssl = subprocess.Popen(["openssl", command] + options,
......@@ -22,9 +24,10 @@ def _openssl(command, options, communicate=None):
raise IOError("OpenSSL Error: {0}".format(err))
return out
#pylint: disable=too-many-locals,too-many-branches,too-many-statements
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def get_crt(config, log=LOGGER):
"""Get ACME certificate by resolving DNS challenge"""
"""Get ACME certificate by resolving DNS challenge."""
def _update_dns(rrset, action):
"""Updates DNS resource by adding or deleting resource."""
......@@ -43,7 +46,7 @@ def get_crt(config, log=LOGGER):
def _send_signed_request(url, payload, extra_headers=None):
"""Sends signed requests to ACME server."""
nonlocal nonce
if payload == "": # on POST-as-GET, final payload has to be just empty string
if payload == "": # on POST-as-GET, final payload has to be just empty string
payload64 = ""
else:
payload64 = _base64(json.dumps(payload).encode("utf8"))
......@@ -93,7 +96,7 @@ def get_crt(config, log=LOGGER):
for san in subject_alt_names.group(1).split(", "):
if san.startswith("DNS:"):
domains.add(san[4:])
if len(domains) == 0: #pylint: disable=len-as-condition
if len(domains) == 0: # pylint: disable=len-as-condition
raise ValueError("Didn't find any domain to validate in the provided CSR.")
log.info("Configure DNS client tools.")
......@@ -109,8 +112,8 @@ def get_crt(config, log=LOGGER):
nameserver = nameserver + [ipv6_rrset.to_text() for ipv6_rrset
in dns.resolver.query(config["DNS"]["Host"], rdtype="AAAA")]
except dns.exception.DNSException:
log.info("A and/or AAAA DNS resources not found for configured dns host: we will use either\
resource found if one exists or directly the DNS Host configuration.")
log.info(("A and/or AAAA DNS resources not found for configured dns host: we will use "
"either resource found if one exists or directly the DNS Host configuration."))
if not nameserver:
nameserver = [config["DNS"]["Host"]]
resolver.nameservers = nameserver
......@@ -143,8 +146,8 @@ def get_crt(config, log=LOGGER):
account_request = {}
if terms_service:
account_request["termsOfServiceAgreed"] = True
log.warning("Terms of service exist and will be automatically agreed if possible, \
you should read them: %s", terms_service)
log.warning(("Terms of service exist and will be automatically agreed if possible, "
"you should read them: %s"), terms_service)
account_request["contact"] = config["acmednstiny"].get("Contacts", "").split(';')
if account_request["contact"] == [""]:
del account_request["contact"]
......@@ -184,8 +187,8 @@ def get_crt(config, log=LOGGER):
.format(order))
elif (http_response.status_code == 403
and order["type"] == "urn:ietf:params:acme:error:userActionRequired"):
raise ValueError("Order creation failed ({0}). Read Terms of Service ({1}), \
then follow your CA instructions: {2}"
raise ValueError(("Order creation failed ({0}). Read Terms of Service ({1}), then follow "
"your CA instructions: {2}")
.format(order["detail"], http_response.headers['Link'], order["instance"]))
else:
raise ValueError("Error getting new Order: {0} {1}"
......@@ -211,15 +214,15 @@ def get_crt(config, log=LOGGER):
keyauthorization = "{0}.{1}".format(token, jwk_thumbprint)
keydigest64 = _base64(hashlib.sha256(keyauthorization.encode("utf8")).digest())
dnsrr_domain = "_acme-challenge.{0}.".format(domain)
try: # a CNAME resource can be used for advanced TSIG configuration
# Note: the CNAME target has to be of "non-CNAME" type (recursion isn't managed)
try: # a CNAME resource can be used for advanced TSIG configuration
# Note: the CNAME target has to be of "non-CNAME" type (recursion isn't managed)
dnsrr_domain = [response.to_text() for response
in resolver.query(dnsrr_domain, rdtype="CNAME")][0]
log.info(" - A CNAME resource has been found for this domain, will install TXT on %s",
dnsrr_domain)
except dns.exception.DNSException as dnsexception:
log.debug(" - Not any CNAME resource has been found for this domain (%s), will install\
TXT directly on %s", dnsrr_domain, type(dnsexception).__name__)
log.debug((" - Not any CNAME resource has been found for this domain (%s), will "
"install TXT directly on %s"), dnsrr_domain, type(dnsexception).__name__)
dnsrr_set = dns.rrset.from_text(dnsrr_domain, config["DNS"].getint("TTL"),
"IN", "TXT", '"{0}"'.format(keydigest64))
try:
......@@ -235,8 +238,8 @@ def get_crt(config, log=LOGGER):
number_check_fail = 1
while challenge_verified is False:
try:
log.debug('Self test (try: %s): Check resource with value "%s" exits on\
nameservers: %s', number_check_fail, keydigest64, resolver.nameservers)
log.debug(('Self test (try: %s): Check resource with value "%s" exits on '
'nameservers: %s'), number_check_fail, keydigest64, resolver.nameservers)
for response in resolver.query(dnsrr_domain, rdtype="TXT").rrset:
log.debug(" - Found value %s", response.to_text())
challenge_verified = (challenge_verified
......@@ -312,6 +315,7 @@ def get_crt(config, log=LOGGER):
log.info("Certificate signed and chain received: %s", order["certificate"])
return http_response.text
def main(argv):
"""Parse arguments and get certificate."""
parser = argparse.ArgumentParser(
......@@ -353,5 +357,6 @@ from the configuration file.")
signed_crt = get_crt(config, LOGGER)
sys.stdout.write(signed_crt)
if __name__ == "__main__": # pragma: no cover
main(sys.argv[1:])
......@@ -18,6 +18,7 @@ TSIGKEYVALUE = os.getenv("GITLABCI_TSIGKEYVALUE")
TSIGALGORITHM = os.getenv("GITLABCI_TSIGALGORITHM")
CONTACT = os.getenv("GITLABCI_CONTACT")
def generate_config():
"""Generate basic acme-dns-tiny configuration"""
# Account key
......@@ -50,6 +51,7 @@ def generate_config():
return account_key.name, domain_key.name, domain_csr.name, parser
def generate_acme_dns_tiny_unit_test_config():
"""Genereate acme_dns_tiny configurations used for unit tests"""
# Configuration missing DNS section
......@@ -63,7 +65,8 @@ def generate_acme_dns_tiny_unit_test_config():
return {"missing_dns": missing_dns.name}
def generate_acme_dns_tiny_config(): #pylint: disable=too-many-locals,too-many-statements
def generate_acme_dns_tiny_config(): # pylint: disable=too-many-locals,too-many-statements
"""Generate acme_dns_tiny configuration with account and domain keys"""
# Simple configuration with good options
account_key, domain_key, _, config = generate_config()
......@@ -132,7 +135,6 @@ def generate_acme_dns_tiny_config(): #pylint: disable=too-many-locals,too-many-s
with open(good_san.name, 'w') as configfile:
config.write(configfile)
# Configuration with CSR containing a wildcard domain inside subjetcAltName
account_key, domain_key, domain_csr, config = generate_config()
......@@ -198,6 +200,7 @@ def generate_acme_dns_tiny_config(): #pylint: disable=too-many-locals,too-many-s
"cname_csr": cname_csr,
}
def generate_acme_account_rollover_config():
"""Generate config for acme_account_rollover script"""
# Old account key is directly created by the config generator
......@@ -219,6 +222,7 @@ def generate_acme_account_rollover_config():
"new_account_key": new_account_key.name
}
def generate_acme_account_deactivate_config():
"""Generate config for acme_account_deactivate script"""
# Account key is created by the by the config generator
......
......@@ -9,8 +9,9 @@ import tools.acme_account_deactivate
ACME_DIRECTORY = os.getenv("GITLABCI_ACMEDIRECTORY_V2",
"https://acme-staging-v02.api.letsencrypt.org/directory")
class TestACMEAccountDeactivate(unittest.TestCase):
"Tests for acme_account_deactivate"
"""Tests for acme_account_deactivate."""
@classmethod
def setUpClass(cls):
......@@ -25,7 +26,7 @@ class TestACMEAccountDeactivate(unittest.TestCase):
super(TestACMEAccountDeactivate, cls).setUpClass()
# To clean ACME staging server and close correctly temporary files
#pylint: disable=bare-except
# pylint: disable=bare-except
@classmethod
def tearDownClass(cls):
# Remove temporary files
......@@ -53,5 +54,6 @@ class TestACMEAccountDeactivate(unittest.TestCase):
self.assertIn("INFO:acme_account_deactivate:The account has been deactivated.",
accountdeactivatelog.output)
if __name__ == "__main__": # pragma: no cover
unittest.main()
......@@ -10,8 +10,9 @@ import tools.acme_account_rollover
ACME_DIRECTORY = os.getenv("GITLABCI_ACMEDIRECTORY_V2",
"https://acme-staging-v02.api.letsencrypt.org/directory")
class TestACMEAccountRollover(unittest.TestCase):
"Tests for acme_account_rollover"
"""Tests for acme_account_rollover."""
@classmethod
def setUpClass(cls):
......@@ -20,7 +21,7 @@ class TestACMEAccountRollover(unittest.TestCase):
super(TestACMEAccountRollover, cls).setUpClass()
# To clean ACME staging server and close correctly temporary files
#pylint: disable=bare-except
# pylint: disable=bare-except
@classmethod
def tearDownClass(cls):
# Remove temporary files
......@@ -51,13 +52,13 @@ class TestACMEAccountRollover(unittest.TestCase):
super(TestACMEAccountRollover, cls).tearDownClass()
def test_success_account_rollover(self):
""" Test success account key rollover """
""" Test success account key rollover."""
with self.assertLogs(level='INFO') as accountrolloverlog:
tools.acme_account_rollover.main(["--current", self.configs['old_account_key'],
"--new", self.configs['new_account_key'],
"--acme-directory", ACME_DIRECTORY])
self.assertIn("INFO:acme_account_rollover:Keys rolled over.",
accountrolloverlog.output)
self.assertIn("INFO:acme_account_rollover:Keys rolled over.", accountrolloverlog.output)
if __name__ == "__main__": # pragma: no cover
unittest.main()
......@@ -13,8 +13,9 @@ from tools.acme_account_deactivate import account_deactivate
ACME_DIRECTORY = os.getenv("GITLABCI_ACMEDIRECTORY_V2",
"https://acme-staging-v02.api.letsencrypt.org/directory")
def _openssl(command, options, communicate=None):
"""Helper function to run openssl command"""
"""Helper function to run openssl command."""
openssl = subprocess.Popen(["openssl", command] + options,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
......@@ -23,8 +24,9 @@ def _openssl(command, options, communicate=None):
raise IOError("OpenSSL Error: {0}".format(err))
return out.decode("utf8")
class TestACMEDNSTiny(unittest.TestCase):
"Tests for acme_dns_tiny.get_crt()"
"""Tests for acme_dns_tiny.get_crt()."""
@classmethod
def setUpClass(cls):
......@@ -36,7 +38,7 @@ class TestACMEDNSTiny(unittest.TestCase):
super(TestACMEDNSTiny, cls).setUpClass()
# To clean ACME staging server and close correctly temporary files
#pylint: disable=bare-except
# pylint: disable=bare-except
@classmethod
def tearDownClass(cls):
# close temp files correctly
......@@ -77,7 +79,7 @@ class TestACMEDNSTiny(unittest.TestCase):
self.assertIn("Issuer", readablecertchain)
def test_success_cn(self):
""" Successfully issue a certificate via common name """
"""Successfully issue a certificate via common name."""
old_stdout = sys.stdout
sys.stdout = StringIO()
......@@ -90,7 +92,7 @@ class TestACMEDNSTiny(unittest.TestCase):
self._assert_certificate_chain(certchain)
def test_success_cn_without_contacts(self):
""" Successfully issue a certificate via CN, but without Contacts field """
"""Successfully issue a certificate via CN, but without Contacts field."""
old_stdout = sys.stdout
sys.stdout = StringIO()
......@@ -103,7 +105,7 @@ class TestACMEDNSTiny(unittest.TestCase):
self._assert_certificate_chain(certchain)
def test_success_cn_with_csr_option(self):
""" Successfully issue a certificate using CSR option outside from the config file"""
"""Successfully issue a certificate using CSR option outside from the config file."""
old_stdout = sys.stdout
sys.stdout = StringIO()
......@@ -117,7 +119,7 @@ class TestACMEDNSTiny(unittest.TestCase):
self._assert_certificate_chain(certchain)
def test_success_wild_cn(self):
""" Successfully issue a certificate via a wildcard common name """
"""Successfully issue a certificate via a wildcard common name."""
old_stdout = sys.stdout
sys.stdout = StringIO()
......@@ -130,16 +132,16 @@ class TestACMEDNSTiny(unittest.TestCase):
self._assert_certificate_chain(certchain)
def test_success_dnshost_ip(self):
""" When DNS Host is an IP, DNS resolution have to fail without error """
"""When DNS Host is an IP, DNS resolution have to fail without error."""
old_stdout = sys.stdout
sys.stdout = StringIO()
with self.assertLogs(level='INFO') as adnslog:
acme_dns_tiny.main([self.configs['dns_host_ip'],
"--verbose"])
self.assertIn("INFO:acme_dns_tiny:A and/or AAAA DNS resources not found for configured dns \
host: we will use either resource found if one exists or directly the DNS Host configuration.",
adnslog.output)
self.assertIn(("INFO:acme_dns_tiny:A and/or AAAA DNS resources not found for configured "
"dns host: we will use either resource found if one exists or directly the "
"DNS Host configuration."), adnslog.output)
certchain = sys.stdout.getvalue()
sys.stdout.close()
......@@ -148,7 +150,7 @@ host: we will use either resource found if one exists or directly the DNS Host c
self._assert_certificate_chain(certchain)
def test_success_san(self):
""" Successfully issue a certificate via subject alt name """
"""Successfully issue a certificate via subject alt name."""
old_stdout = sys.stdout
sys.stdout = StringIO()
......@@ -161,7 +163,7 @@ host: we will use either resource found if one exists or directly the DNS Host c
self._assert_certificate_chain(certchain)
def test_success_wildsan(self):
""" Successfully issue a certificate via wildcard in subject alt name """
"""Successfully issue a certificate via wildcard in subject alt name."""
old_stdout = sys.stdout
sys.stdout = StringIO()
......@@ -174,7 +176,7 @@ host: we will use either resource found if one exists or directly the DNS Host c
self._assert_certificate_chain(certchain)
def test_success_cli(self):
""" Successfully issue a certificate via command line interface """
"""Successfully issue a certificate via command line interface."""
certout, _ = subprocess.Popen([
"python3", "acme_dns_tiny.py", self.configs['good_cname'], "--verbose"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
......@@ -184,7 +186,7 @@ host: we will use either resource found if one exists or directly the DNS Host c
self._assert_certificate_chain(certchain)
def test_success_cli_with_csr_option(self):
""" Successfully issue a certificate via command line interface using CSR option"""
"""Successfully issue a certificate via command line interface using CSR option."""
certout, _ = subprocess.Popen([
"python3", "acme_dns_tiny.py", "--csr", self.configs['cname_csr'],
self.configs['good_cname_without_csr'], "--verbose"
......@@ -195,22 +197,23 @@ host: we will use either resource found if one exists or directly the DNS Host c
self._assert_certificate_chain(certchain)
def test_weak_key(self):
""" Let's Encrypt rejects weak keys """
"""Let's Encrypt rejects weak keys."""
self.assertRaisesRegex(ValueError,
"key too small",
acme_dns_tiny.main, [self.configs['weak_key'], "--verbose"])
def test_account_key_domain(self):
""" Can't use the account key for the CSR """
"""Can't use the account key for the CSR."""
self.assertRaisesRegex(ValueError,
"certificate public key must be different than account key",
acme_dns_tiny.main, [self.configs['account_as_domain'], "--verbose"])
def test_failure_dns_update_tsigkeyname(self):
""" Fail to update DNS records by invalid TSIG Key name """
"""Fail to update DNS records by invalid TSIG Key name."""
self.assertRaisesRegex(ValueError,
"Error updating DNS",
acme_dns_tiny.main, [self.configs['invalid_tsig_name'], "--verbose"])
if __name__ == "__main__": # pragma: no cover
unittest.main()
......@@ -7,6 +7,7 @@ import dns.version
import acme_dns_tiny
from tests.config_factory import generate_acme_dns_tiny_unit_test_config
class TestACMEDNSTiny(unittest.TestCase):
"Tests for acme_dns_tiny.get_crt()"
......@@ -31,11 +32,11 @@ class TestACMEDNSTiny(unittest.TestCase):
os.remove(cls.configs[conffile])
super(TestACMEDNSTiny, cls).tearDownClass()
def test_failure_notcompleted_configuration(self):
""" Configuration file have to be completed """
self.assertRaisesRegex(ValueError, r"Some required settings are missing.",
acme_dns_tiny.main, [self.configs['missing_dns'], "--verbose"])
if __name__ == "__main__": # pragma: no cover
unittest.main()
......@@ -14,10 +14,12 @@ import requests
LOGGER = logging.getLogger("acme_account_deactivate")
LOGGER.addHandler(logging.StreamHandler())
def _b64(text):
""""Encodes text as base64 as specified in ACME RFC """
"""Encodes text as base64 as specified in ACME RFC."""
return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")
def _openssl(command, options, communicate=None):
"""Run openssl command line and raise IOError on non-zero return."""
openssl = subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE,
......@@ -27,13 +29,14 @@ def _openssl(command, options, communicate=None):
raise IOError("OpenSSL Error: {0}".format(err))
return out
def account_deactivate(accountkeypath, acme_directory, log=LOGGER):
"""Deactivate an ACME account"""
"""Deactivate an ACME account."""
def _send_signed_request(url, payload):
"""Sends signed requests to ACME server."""
nonlocal nonce
if payload == "": # on POST-as-GET, final payload has to be just empty string
if payload == "": # on POST-as-GET, final payload has to be just empty string
payload64 = ""
else:
payload64 = _b64(json.dumps(payload).encode("utf8"))
......@@ -109,8 +112,9 @@ def account_deactivate(accountkeypath, acme_directory, log=LOGGER):
raise ValueError("Error while deactivating the account key: {0} {1}"
.format(http_response.status_code, result))
def main(argv):
"""Parse arguments and deactivate account"""
"""Parse arguments and deactivate account."""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Tiny ACME script to deactivate an ACME account",
......@@ -138,5 +142,6 @@ https://acme-staging-v02.api.letsencrypt.org/directory"""
LOGGER.setLevel(args.quiet or logging.INFO)
account_deactivate(args.account_key, args.acme_directory, log=LOGGER)
if __name__ == "__main__": # pragma: no cover
main(sys.argv[1:])
#!/usr/bin/env python3
#pylint: disable=too-many-statements
# pylint: disable=too-many-statements
"""Tiny script to rollover two keys for an ACME account"""
import sys
import argparse
......@@ -15,10 +15,12 @@ import requests
LOGGER = logging.getLogger("acme_account_rollover")
LOGGER.addHandler(logging.StreamHandler())
def _b64(text):
""""Encodes text as base64 as specified in ACME RFC """
"""Encodes text as base64 as specified in ACME RFC."""
return base64.urlsafe_b64encode(text).decode("utf8").rstrip("=")
def _openssl(command, options, communicate=None):
"""Run openssl command line and raise IOError on non-zero return."""
openssl = subprocess.Popen(["openssl", command] + options, stdin=subprocess.PIPE,
......@@ -28,6 +30,7 @@ def _openssl(command, options, communicate=None):
raise IOError("OpenSSL Error: {0}".format(err))
return out
def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log=LOGGER):
"""Rollover the old and new account key for an ACME account."""
def _get_private_acme_signature(accountkeypath):
......@@ -50,7 +53,7 @@ def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log
def _sign_request(url, keypath, payload, is_inner=False):
"""Signs request with a specific right account key."""
nonlocal nonce
if payload == "": # on POST-as-GET, final payload has to be just empty string
if payload == "": # on POST-as-GET, final payload has to be just empty string
payload64 = ""
else:
payload64 = _b64(json.dumps(payload).encode("utf8"))
......@@ -136,8 +139,9 @@ def account_rollover(old_accountkeypath, new_accountkeypath, acme_directory, log
.format(http_response.status_code, result))
log.info("Keys rolled over.")
def main(argv):
"""Parse arguments and roll over the ACME account keys"""
"""Parse arguments and roll over the ACME account keys."""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Tiny ACME client to roll over an ACME account key with another one.",
......@@ -162,5 +166,6 @@ https://acme-staging-v02.api.letsencrypt.org/directory""")
LOGGER.setLevel(args.quiet or logging.INFO)
account_rollover(args.current, args.new, args.acme_directory, log=LOGGER)
if __name__ == "__main__": # pragma: no cover
main(sys.argv[1:])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment