Commit 826ffde7 authored by Adrien Dorsaz's avatar Adrien Dorsaz

chain: get automatically CA's signing certificate

Note: support of Python 2 was removed to simplify scripts and to be able
to manage correctly the default configuration if the INI file don't
contain some values (as DNS port, ACME url).
parent 34b45f6e
#!/usr/bin/env python
import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging, dns
import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging
import dns.resolver, dns.tsigkeyring, dns.update
try:
from urllib.request import urlopen # Python 3
except ImportError:
from urllib2 import urlopen # Python 2
try:
from configparser import ConfigParser #Python 3
except ImportError:
from ConfigParser import ConfigParser #Python 2
from configparser import ConfigParser
from urllib.request import urlopen
from urllib.parse import parse_qs
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler())
......@@ -19,7 +14,7 @@ def get_crt(config, log=LOGGER):
def _b64(b):
return base64.urlsafe_b64encode(b).decode("utf8").replace("=", "")
# helper function to send dynamic update messages
# helper function to send DNS dynamic update messages
def _update_dns(rrset, action):
dns_update = dns.update.Update(config["DNS"]["zone"], keyring=keyring, keyalgorithm=dns.name.from_text(config["TSIGKeyring"]["Algorithm"]))
if action == "add":
......@@ -47,9 +42,9 @@ def get_crt(config, log=LOGGER):
})
try:
resp = urlopen(url, data.encode("utf8"))
return resp.getcode(), resp.read()
return resp.getcode(), resp.read(), resp.getheaders()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
return getattr(e, "code", None), getattr(e, "read", e.__str__)(), None
# create DNS keyring
keyring = dns.tsigkeyring.from_text({ config["TSIGKeyring"]["KeyName"] : config["TSIGKeyring"]["KeyValue"]})
......@@ -96,7 +91,7 @@ def get_crt(config, log=LOGGER):
# get the certificate domains and expiration
log.info("Registering account...")
code, result = _send_signed_request(config["acmednstiny"]["CAUrl"] + "/acme/new-reg", {
code, result, headers = _send_signed_request(config["acmednstiny"]["CAUrl"] + "/acme/new-reg", {
"resource": "new-reg",
"agreement": "https://letsencrypt.org/documents/LE-SA-v1.0.1-July-27-2015.pdf",
})
......@@ -112,7 +107,7 @@ def get_crt(config, log=LOGGER):
log.info("Verifying {0}...".format(domain))
# get new challenge
code, result = _send_signed_request(config["acmednstiny"]["CAUrl"] + "/acme/new-authz", {
code, result, headers = _send_signed_request(config["acmednstiny"]["CAUrl"] + "/acme/new-authz", {
"resource": "new-authz",
"identifier": {"type": "dns", "value": domain},
})
......@@ -135,7 +130,7 @@ def get_crt(config, log=LOGGER):
time.sleep(10)
# notify challenge are met
code, result = _send_signed_request(challenge["uri"], {
code, result, headers = _send_signed_request(challenge["uri"], {
"resource": "challenge",
"keyAuthorization": keyauthorization,
})
......@@ -165,30 +160,44 @@ def get_crt(config, log=LOGGER):
proc = subprocess.Popen(["openssl", "req", "-in", config["acmednstiny"]["CSRFile"], "-outform", "DER"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
csr_der, err = proc.communicate()
code, result = _send_signed_request(config["acmednstiny"]["CAUrl"] + "/acme/new-cert", {
code, result, headers = _send_signed_request(config["acmednstiny"]["CAUrl"] + "/acme/new-cert", {
"resource": "new-cert",
"csr": _b64(csr_der),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
certificate = "\n".join(textwrap.wrap(base64.b64encode(result).decode("utf8"), 64))
# get the parent certificate which had created this one
linkheader = [link.strip() for link in dict(headers)["Link"].split(',')]
certificate_parent_url = [re.match(r'<(?P<url>.*)>.*;rel=(up|("([a-z][a-z0-9\.\-]*\s+)*up[\s"]))', link).groupdict()
for link in linkheader][0]["url"]
resp = urlopen(certificate_parent_url)
code = resp.getcode()
result = resp.read()
if code not in [200, 201]:
raise ValueError("Error getting certificate chain from {0}: {1} {2}".format(
certificate_parent_url, code, result))
certificate_parent = "\n".join(textwrap.wrap(base64.b64encode(result).decode("utf8"), 64))
# return signed certificate!
log.info("Certificate signed!")
return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
"\n".join(textwrap.wrap(base64.b64encode(result).decode("utf8"), 64)))
return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n-----BEGIN CERTIFICATE-----\n{1}\n-----END CERTIFICATE-----\n""".format(
certificate, certificate_parent)
def main(argv):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
This script automates the process of getting a signed TLS certificate from
Let's Encrypt using the ACME protocol and its dns verification.
This script automates the process of getting a signed TLS certificate
chain from Let's Encrypt using the ACME protocol and its dns verification.
It will need to have access to your private account key and dns server
so PLEASE READ THROUGH IT!
It's only ~250 lines, so it won't take long.
===Example Usage===
python acme_dns_tiny.py --config ./acmd_dns_tiny.ini > signed.crt
python acme_dns_tiny.py ./example.ini > chain.crt
See example.ini file to configure correctly this script.
===================
""")
)
......@@ -203,7 +212,7 @@ def main(argv):
if (set(["accountkeyfile", "csrfile", "caurl"]) - set(config.options("acmednstiny"))
or set(["keyname", "keyvalue", "algorithm"]) - set(config.options("TSIGKeyring"))
or set(["zone", "host", "port", "ttl"]) - set(config.options("DNS"))):
or set(["zone", "host", "port"]) - set(config.options("DNS"))):
raise ValueError("Some required settings are missing.")
LOGGER.setLevel(args.quiet or LOGGER.level)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment