Commit b37284d2 authored by Daniel Roesler's avatar Daniel Roesler

added error handling tests

parent e6cad5f0
......@@ -2,7 +2,6 @@ sudo: required
dist: trusty
language: python
python:
- "2.6"
- "2.7"
- "3.3"
- "3.4"
......@@ -13,6 +12,6 @@ before_install:
install:
- pip install -r tests/requirements.txt
script:
- coverage run -m unittest tests
- coverage run --source ./ --omit ./tests/server.py -m unittest tests
after_success:
- coveralls
......@@ -13,7 +13,6 @@ LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA):
# helper function base64 encode for jose spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
......@@ -53,10 +52,8 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA):
if proc.returncode != 0:
raise IOError("OpenSSL Error: {0}".format(err))
data = json.dumps({
"header": header,
"protected": protected64,
"payload": payload64,
"signature": _b64(out),
"header": header, "protected": protected64,
"payload": payload64, "signature": _b64(out),
})
try:
resp = urlopen(url, data.encode('utf8'))
......@@ -104,7 +101,7 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA):
"identifier": {"type": "dns", "value": domain},
})
if code != 201:
raise ValueError("Error registering: {0} {1}".format(code, result))
raise ValueError("Error requesting challenges: {0} {1}".format(code, result))
# make the challenge file
challenge = [c for c in json.loads(result.decode('utf8'))['challenges'] if c['type'] == "http-01"][0]
......@@ -168,7 +165,7 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA):
return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
"\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64)))
if __name__ == "__main__":
def main(argv):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
......@@ -192,7 +189,10 @@ if __name__ == "__main__":
parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors")
parser.add_argument("--ca", default=DEFAULT_CA, help="certificate authority, default is Let's Encrypt")
args = parser.parse_args()
args = parser.parse_args(argv)
LOGGER.setLevel(args.quiet or LOGGER.level)
signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca)
sys.stdout.write(signed_crt)
if __name__ == "__main__": # pragma: no cover
main(sys.argv[1:])
......@@ -16,15 +16,50 @@ def gen_keys():
account_key = NamedTemporaryFile()
Popen(["openssl", "genrsa", "-out", account_key.name, "2048"]).wait()
# weak 1024 bit key
weak_key = NamedTemporaryFile()
Popen(["openssl", "genrsa", "-out", weak_key.name, "1024"]).wait()
# good domain key
domain_key = NamedTemporaryFile()
domain_csr = NamedTemporaryFile()
Popen(["openssl", "req", "-newkey", "rsa:2048", "-nodes", "-keyout", domain_key.name,
"-subj", "/CN={0}".format(DOMAIN), "-out", domain_csr.name]).wait()
# subject alt-name domain
san_csr = NamedTemporaryFile()
san_conf = NamedTemporaryFile()
san_conf.write(open("/etc/ssl/openssl.cnf").read().encode("utf8"))
san_conf.write("\n[SAN]\nsubjectAltName=DNS:{0}\n".format(DOMAIN).encode("utf8"))
san_conf.seek(0)
Popen(["openssl", "req", "-new", "-sha256", "-key", domain_key.name,
"-subj", "/", "-reqexts", "SAN", "-config", san_conf.name,
"-out", san_csr.name]).wait()
# invalid domain csr
invalid_csr = NamedTemporaryFile()
Popen(["openssl", "req", "-new", "-sha256", "-key", domain_key.name,
"-subj", "/CN=\xC3\xA0\xC2\xB2\xC2\xA0_\xC3\xA0\xC2\xB2\xC2\xA0.com", "-out", invalid_csr.name]).wait()
# nonexistent domain csr
nonexistent_csr = NamedTemporaryFile()
Popen(["openssl", "req", "-new", "-sha256", "-key", domain_key.name,
"-subj", "/CN=404.gethttpsforfree.com", "-out", nonexistent_csr.name]).wait()
# account-signed domain csr
account_csr = NamedTemporaryFile()
Popen(["openssl", "req", "-new", "-sha256", "-key", account_key.name,
"-subj", "/CN={0}".format(DOMAIN), "-out", account_csr.name]).wait()
return {
"account_key": account_key,
"weak_key": weak_key,
"domain_key": domain_key,
"domain_csr": domain_csr,
"san_csr": san_csr,
"invalid_csr": invalid_csr,
"nonexistent_csr": nonexistent_csr,
"account_csr": account_csr,
}
# fake a folder structure to catch the key authorization file
......
import unittest, os, tempfile
from subprocess import Popen
import unittest, os, sys, tempfile
from subprocess import Popen, PIPE
try:
from StringIO import StringIO # Python 2
except ImportError:
from io import StringIO # Python 3
import acme_tiny
from . import monkey
from .monkey import gen_keys
KEYS = gen_keys()
class TestModule(unittest.TestCase):
"Tests for acme_tiny.get_crt()"
def setUp(self):
self.CA = "https://acme-staging.api.letsencrypt.org"
self.keys = monkey.gen_keys()
self.tempdir = tempfile.mkdtemp()
self.fuse_proc = Popen(["python", "tests/monkey.py", self.tempdir])
......@@ -18,9 +23,134 @@ class TestModule(unittest.TestCase):
self.fuse_proc.wait()
os.rmdir(self.tempdir)
def test_success(self):
result = acme_tiny.get_crt(
self.keys['account_key'].name,
self.keys['domain_csr'].name,
self.tempdir,
CA=self.CA)
def test_success_cn(self):
""" Successfully issue a certficate via common name """
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['domain_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
sys.stdout.seek(0)
crt = sys.stdout.read().encode("utf8")
sys.stdout = old_stdout
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("Issuer: CN=happy hacker fake CA", out.decode("utf8"))
def test_success_san(self):
""" Successfully issue a certficate via subject alt name """
old_stdout = sys.stdout
sys.stdout = StringIO()
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['san_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
sys.stdout.seek(0)
crt = sys.stdout.read().encode("utf8")
sys.stdout = old_stdout
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("Issuer: CN=happy hacker fake CA", out.decode("utf8"))
def test_success_cli(self):
""" Successfully issue a certficate via command line interface """
crt, err = Popen([
"python", "acme_tiny.py",
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['domain_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
], stdout=PIPE, stderr=PIPE).communicate()
out, err = Popen(["openssl", "x509", "-text", "-noout"], stdin=PIPE,
stdout=PIPE, stderr=PIPE).communicate(crt)
self.assertIn("Issuer: CN=happy hacker fake CA", out.decode("utf8"))
def test_missing_account_key(self):
""" OpenSSL throws an error when the account key is missing """
try:
result = acme_tiny.main([
"--account-key", "/foo/bar",
"--csr", KEYS['domain_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
except Exception as e:
result = e
self.assertIsInstance(result, IOError)
self.assertIn("Error opening Private Key", result.args[0])
def test_missing_csr(self):
""" OpenSSL throws an error when the CSR is missing """
try:
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", "/foo/bar",
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
except Exception as e:
result = e
self.assertIsInstance(result, IOError)
self.assertIn("Error loading /foo/bar", result.args[0])
def test_weak_key(self):
""" Let's Encrypt rejects weak keys """
try:
result = acme_tiny.main([
"--account-key", KEYS['weak_key'].name,
"--csr", KEYS['domain_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Key too small", result.args[0])
def test_invalid_domain(self):
""" Let's Encrypt rejects invalid domains """
try:
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['invalid_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Invalid character in DNS name", result.args[0])
def test_nonexistant_domain(self):
""" Should be unable verify a nonexistent domain """
try:
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['nonexistent_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("but couldn't download", result.args[0])
def test_account_key_domain(self):
""" Can't use the account key for the CSR """
try:
result = acme_tiny.main([
"--account-key", KEYS['account_key'].name,
"--csr", KEYS['account_csr'].name,
"--acme-dir", self.tempdir,
"--ca", self.CA,
])
except Exception as e:
result = e
self.assertIsInstance(result, ValueError)
self.assertIn("Certificate public key must be different than account key", result.args[0])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment