Commit e199e3d4 authored by Adrien Dorsaz's avatar Adrien Dorsaz

Use of ACMEDirectory instead of CAUrl part 3/3

Enable ACMEDirectory for new-reg part.

Enable use of dynamic terms-of-service url for the agreement part too ( fixes #1 ).

Enable a way to add contact information inside account registration (email and phone).
It allows the ACME server provider to contact you on terms of service modifications.

Fix HTTPError exception handling (use correct error from urllib.error instead of IOError).
parent 86b18e09
......@@ -3,6 +3,7 @@ import argparse, subprocess, json, sys, base64, binascii, time, hashlib, re, cop
import dns.resolver, dns.tsigkeyring, dns.update
from configparser import ConfigParser
from urllib.request import urlopen
from urllib.error import HTTPError
LOGGER = logging.getLogger('acme_dns_tiny_logger')
LOGGER.addHandler(logging.StreamHandler())
......@@ -49,12 +50,21 @@ def get_crt(config, log=LOGGER):
try:
resp = urlopen(url, data.encode("utf8"))
return resp.getcode(), resp.read(), resp.getheaders()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)(), None
except HTTPError as httperror:
return httperror.getcode(), httperror.read(), httperror.getheaders()
# get ACME server configuration from the directory
# helper function to get url from Link HTTP headers
def _get_url_link(headers, rel):
linkheaders = [link.strip() for link in dict(headers)["Link"].split(',')]
url = [re.match(r'<(?P<url>.*)>.*;rel=(' + re.escape(rel) + r'|("([a-z][a-z0-9\.\-]*\s+)*' + re.escape(rel) + r'[\s"]))', link).groupdict()
for link in linkheaders][0]["url"]
return url
# main code
log.info("Read ACME directory.")
directory = urlopen(config["acmednstiny"]["ACMEDirectory"])
acme_config = json.loads(directory.read().decode("utf8"))
current_terms = acme_config.get("meta", {}).get("terms-of-service")
# create DNS keyring and resolver
log.info("Prepare DNS tools...")
......@@ -108,19 +118,49 @@ def get_crt(config, log=LOGGER):
if san.startswith("DNS:"):
domains.add(san[4:])
# get the certificate domains and expiration
log.info("Registering account...")
code, result, headers = _send_signed_request(config["acmednstiny"]["CAUrl"] + "/acme/new-reg", {
"resource": "new-reg",
"agreement": "https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf",
})
log.info("Registering ACME Account.")
reg_info = {"resource": "new-reg"}
if current_terms is not None:
reg_info["agreement"] = current_terms
reg_info["contact"] = []
reg_mailto = "mailto:{0}".format(config["acmednstiny"].get("MailContact"))
reg_phone = "tel:{0}".format(config["acmednstiny"].get("PhoneContact"))
if config["acmednstiny"].get("MailContact") is not None:
reg_info["contact"].append(reg_mailto)
if config["acmednstiny"].get("PhoneContact") is not None:
reg_info["contact"].append(reg_phone)
if len(reg_info["contact"]) == 0:
del reg_info["contact"]
code, result, headers = _send_signed_request(acme_config["new-reg"], reg_info)
if code == 201:
log.info("Registered!")
reg_received_contact = reg_info.get("contact")
account_url = dict(headers).get("Location")
log.info("Registered! (account: '{0}')".format(account_url))
elif code == 409:
log.info("Already registered!")
account_url = dict(headers).get("Location")
log.info("Already registered! (account: '{0}')".format(account_url))
# Client should send empty payload to query account information
code, result, headers = _send_signed_request(account_url, {"resource":"reg"})
account_info = json.loads(result.decode("utf8"))
reg_info["agreement"] = account_info.get("agreement")
reg_received_contact = account_info.get("contact")
else:
raise ValueError("Error registering: {0} {1}".format(code, result))
log.info("Update contact information and terms of service agreement if needed.")
if current_terms is None:
current_terms = _get_url_link(headers, 'terms-of-service')
if (reg_info.get("agreement") != current_terms
or (config["acmednstiny"].get("MailContact") is not None and reg_mailto not in reg_received_contact)
or (config["acmednstiny"].get("PhoneContact") is not None and reg_phone not in reg_received_contact)):
reg_info["resource"] = "reg"
reg_info["agreement"] = current_terms
code, result, headers = _send_signed_request(account_url, reg_info)
if code == 202:
log.info("Account updated (terms of service agreed: '{0}')".format(reg_info.get("agreement")))
else:
raise ValueError("Error register update: {0} {1}".format(code, result))
# verify each domain
for domain in domains:
log.info("Verifying {0}...".format(domain))
......@@ -207,9 +247,7 @@ def get_crt(config, log=LOGGER):
certificate = "\n".join(textwrap.wrap(base64.b64encode(result).decode("utf8"), 64))
# get the parent certificate which had created this one
linkheader = [link.strip() for link in dict(headers)["Link"].split(',')]
certificate_parent_url = [re.match(r'<(?P<url>.*)>.*;rel=(up|("([a-z][a-z0-9\.\-]*\s+)*up[\s"]))', link).groupdict()
for link in linkheader][0]["url"]
certificate_parent_url = _get_url_link(headers, 'up')
resp = urlopen(certificate_parent_url)
code = resp.getcode()
result = resp.read()
......@@ -231,7 +269,7 @@ def main(argv):
chain from Let's Encrypt using the ACME protocol and its DNS verification.
It will need to have access to your private account key and dns server
so PLEASE READ THROUGH IT!
It's only ~250 lines, so it won't take long.
It's around 300 lines, so it won't take long.
===Example Usage===
python3 acme_dns_tiny.py ./example.ini > chain.crt
......
import subprocess, os, json, base64, binascii, re, copy, logging
from urllib.request import urlopen
from urllib.error import HTTPError
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
......@@ -9,9 +10,9 @@ LOGGER.setLevel(logging.INFO)
def delete_account(accountkeypath, log=LOGGER):
# helper function base64 encode for jose spec
# helper function base64 encode as defined in acme spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode("utf8").replace("=", "")
return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
# helper function to run openssl command
def _openssl(command, options, communicate=None):
......@@ -36,9 +37,9 @@ def delete_account(accountkeypath, log=LOGGER):
})
try:
resp = urlopen(url, data.encode("utf8"))
return resp.getcode(), resp.read()
except IOError as e:
return getattr(e, "code", None), getattr(e, "read", e.__str__)()
return resp.getcode(), resp.read(), resp.getheaders()
except HTTPError as httperror:
return httperror.getcode(), httperror.read(), httperror.getheaders()
# parse account key to get public key
log.info("Parsing account key...")
......@@ -61,12 +62,24 @@ def delete_account(accountkeypath, log=LOGGER):
directory = urlopen(ACMEDirectory)
acme_config = json.loads(directory.read().decode("utf8"))
# send request to delete account key
log.info("Register account to get account URL.")
code, result, headers = _send_signed_request(acme_config["new-reg"], {
"resource": "new-reg"
})
if code == 201:
account_url = dict(headers).get("Location")
log.info("Registered! (account: '{0}')".format(account_url))
elif code == 409:
account_url = dict(headers).get("Location")
log.info("Already registered! (account: '{0}')".format(account_url))
log.info("Delete account...")
code, result = _send_signed_request(acme_config["new-reg"], {
code, result, headers = _send_signed_request(account_url, {
"resource": "reg",
"delete": True,
})
if code != 200:
if code not in [200,202]:
raise ValueError("Error deleting account key: {0} {1}".format(code, result))
log.info("Account key deleted !")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment