Commit 1041aa5f authored by Adrien Dorsaz's avatar Adrien Dorsaz
Browse files

tests: update test_module for DNS verficiations and clean some tests

openssl tests:
we don't manage the OpenSSL code, so these tests are useless.

exsitence domain tests:
commented as dynamic DNS updates can create new domains.

configuration tests:
configuration management has a lot changed, so we'll have to
make new tests.

Remove the code managing FUSE and use configuration files instead.

Finally, ask ACME server to delete account key as it is only temporary files
and we don't save them.
parent 0a0ef72d
import subprocess, os, json, base64, binascii, re, copy, logging
from urllib.request import urlopen
CAURL = os.getenv("GITLABCI_CAURL", "https://acme-staging.api.letsencrypt.org")
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
def delete_account(accountkeypath, log=LOGGER):
# helper function base64 encode for jose spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode("utf8").replace("=", "")
# helper function to run openssl command
def _openssl(command, options, communicate=None):
openssl = subprocess.Popen(["openssl", command] + options,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = openssl.communicate(communicate)
if openssl.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
return out
# helper function make signed requests
def _send_signed_request(url, payload):
payload64 = _b64(json.dumps(payload).encode("utf8"))
protected = copy.deepcopy(header)
protected["nonce"] = urlopen(CAURL + "/directory").headers["Replay-Nonce"]
protected64 = _b64(json.dumps(protected).encode("utf8"))
signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
"{0}.{1}".format(protected64, payload64).encode("utf8"))
data = json.dumps({
"header": header, "protected": protected64,
"payload": payload64, "signature": _b64(signature),
})
try:
resp = urlopen(url, data.encode("utf8"))
return resp.getcode(), resp.read()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
# parse account key to get public key
log.info("Parsing account key...")
accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
accountkey.decode("utf8"), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
header = {
"alg": "RS256",
"jwk": {
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
"kty": "RSA",
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
},
}
# send request to delete account key
log.info("Delete account...")
code, result = _send_signed_request(CAURL + "/acme/new-reg", {
"resource": "reg",
"delete": True,
})
if code != 200:
raise ValueError("Error deleting account key: {0} {1}".format(code, result))
log.info("Account key deleted !")
import unittest, os, sys, tempfile
import unittest, sys
from subprocess import Popen, PIPE
try:
from StringIO import StringIO # Python 2
except ImportError:
from io import StringIO # Python 3
from io import StringIO
import acme_dns_tiny
from .monkey import gen_configs
from .acme_account_delete import delete_account
import acme_tiny
from .monkey import gen_keys
KEYS = gen_keys()
CONFIGS = gen_configs()
class TestModule(unittest.TestCase):
"Tests for acme_tiny.get_crt()"
def setUp(self):
self.CA = "https://acme-staging.api.letsencrypt.org"
self.tempdir = tempfile.mkdtemp()
self.fuse_proc = Popen(["python", "tests/monkey.py", self.tempdir])
def tearDown(self):
self.fuse_proc.terminate()
self.fuse_proc.wait()
os.rmdir(self.tempdir)
"Tests for acme_dns_tiny.get_crt()"
def test_success_cn(self):
""" Successfully issue a certificate via common name """
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['domain_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
result = acme_dns_tiny.main([CONFIGS['goodCName'].name])
sys.stdout.seek(0)
crt = sys.stdout.read().encode("utf8")
sys.stdout = old_stdout
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("Issuer: CN=Fake LE Intermediate", out.decode("utf8"))
self.assertIn("BEGIN", crt.decode("utf8"))
self.assertIn("Issuer", out.decode("utf8"))
def test_success_san(self):
""" Successfully issue a certificate via subject alt name """
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['san_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
result = acme_dns_tiny.main([CONFIGS['goodSAN'].name])
sys.stdout.seek(0)
crt = sys.stdout.read().encode("utf8")
sys.stdout = old_stdout
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("Issuer: CN=Fake LE Intermediate", out.decode("utf8"))
self.assertIn("BEGIN", crt.decode("utf8"))
self.assertIn("Issuer", out.decode("utf8"))
def test_success_cli(self):
""" Successfully issue a certificate via command line interface """
crt, err = Popen([
"python", "acme_tiny.py",
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['domain_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
"python3", "acme_dns_tiny.py", CONFIGS['goodCName'].name
], stdout=PIPE, stderr=PIPE).communicate()
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("Issuer: CN=Fake LE Intermediate", out.decode("utf8"))
def test_missing_account_key(self):
""" OpenSSL throws an error when the account key is missing """
try:
result = acme_tiny.main([
"--account-key", "/foo/bar",
"--csr", KEYS['domain_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
except Exception as e:
result = e
self.assertIsInstance(result, IOError)
self.assertIn("Error opening Private Key", result.args[0])
def test_missing_csr(self):
""" OpenSSL throws an error when the CSR is missing """
try:
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", "/foo/bar",
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
except Exception as e:
result = e
self.assertIsInstance(result, IOError)
self.assertIn("Error loading /foo/bar", result.args[0])
self.assertIn("BEGIN", crt.decode("utf8"))
self.assertIn("Issuer", out.decode("utf8"))
def test_weak_key(self):
""" Let's Encrypt rejects weak keys """
try:
result = acme_tiny.main([
"--account-key", KEYS['weak_key'].name,
"--csr", KEYS['domain_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
result = acme_dns_tiny.main([CONFIGS['weakKey'].name])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Key too small", result.args[0])
def test_invalid_domain(self):
""" Let's Encrypt rejects invalid domains """
try:
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['invalid_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Invalid character in DNS name", result.args[0])
def test_nonexistant_domain(self):
""" Should be unable verify a nonexistent domain """
try:
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['nonexistent_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("but couldn't download", result.args[0])
# def test_invalid_domain(self):
# """ Let's Encrypt rejects invalid domains """
# try:
# result = acme_dns_tiny.main([CONFIGS["invalidCSR"].name])
# except Exception as e:
# result = e
# self.assertIsInstance(result, ValueError)
# self.assertIn("Invalid character in DNS name", result.args[0])
#
# def test_nonexistant_domain(self):
# """ Should be unable verify a nonexistent domain """
# try:
# result = acme_dns_tiny.main([CONFIGS["inexistantDomain"].name])
# except Exception as e:
# result = e
# self.assertIsInstance(result, ValueError)
# self.assertIn("urn:acme:error:connection", result.args[0])
def test_account_key_domain(self):
""" Can't use the account key for the CSR """
try:
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['account_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
result = acme_dns_tiny.main([CONFIGS['accountAsDomain'].name])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Certificate public key must be different than account key", result.args[0])
if __name__ == "__main__":
unittest.main()
# delete account key registration at end of tests
delete_account(CONFIGS["key"]["accountkey"].name)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment