Commit 5ff96857 authored by Adrien Dorsaz's avatar Adrien Dorsaz

Merge branch 'v2' into 'master'

V2 : Support of Let's Encrypt API v2

See merge request !12
parents 4e83caf6 d952255e
Pipeline #206 failed with stage
in 2 minutes and 59 seconds
This diff is collapsed.
[acmednstiny]
# Required readable ACME account key
AccountKeyFile = account.key
# Required readable CSR file
# Note: if you use the "--csr" optional argument, this setting is not read and can be omitted
CSRFile = domain.csr
# Optional ACME directory url (default: https://acme-staging.api.letsencrypt.org/directory)
ACMEDirectory = https://acme-staging.api.letsencrypt.org/directory
# Optional time in seconds to wait between DNS update and challenge check (default: 3)
CheckChallengeDelay = 3
# Optional Contact info to send to the ACME provider
MailContact = mail@example.com
# Note that Let's Encrypt servers disallow use of phone numbers
PhoneContact = +11111111111
# Optional ACME directory url
# Default: https://acme-staging-v02.api.letsencrypt.org/directory
ACMEDirectory = https://acme-staging-v02.api.letsencrypt.org/directory
# Optional To be able to be reached by ACME provider (e.g. to warn about
# certificate expicration), you can provide some contact informations.
# Contacts setting is a list of contact URI separated by semicolon (;).
# If ACME provider support contact informations, it must at least support mailto
# URI and can support more of contact.
# For the mailto URI, the email address part must contains only one address
# without header fields (see [RFC6068]).
# Default: none
Contacts = mailto:mail@example.com;mailto:mail2@example.org
# Optional to give hint to the ACME server about your prefered language for errors given by their server
# See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Language for more informations
# Default: en
Language = en
[TSIGKeyring]
# Required TSIG key name
KeyName = host-example
# Required TSIG key value in base64
KeyValue = XXXXXXXXXXX==
# Required TSIG algorithm
Algorithm = hmac-sha256
[DNS]
# Required name of zone to update
Zone = dnszone
# Required name or IP of DNS server
Host = dnsserver
# Optional port to connect on DNS server (default: 53)
Port = 53
# Optional time to live (TTL) value used to add DNS entries
# For each domain registered in the CSR, at least 1 TTL is waited before certificate creation.
# If an error occurs while looking for TXT records, we wait up to 10 TTLs by domain.
# That's why the default is only of 10 seconds, to avoid having too long time to wait to receive a new certificate.
# Default: 10 seconds
TTL = 10
FROM debian:jessie-backports
RUN apt-get update
RUN apt-get upgrade -y
# Minimal tools required by acme-dns-tiny CI
RUN apt-get install -y \
python3-coverage \
python3-pip
RUN apt-get install -y \
-t jessie-backports \
python3-configargparse \
python3-dnspython
# Allows run python3-coverage with same command than manual install by pip
RUN update-alternatives --install \
/usr/bin/coverage \
coverage \
/usr/bin/python3-coverage \
1
FROM debian:jessie
RUN apt-get update
RUN apt-get upgrade -y
# Minimal tools required by acme-dns-tiny CI
RUN apt-get install -y \
python3-dnspython \
python3-coverage \
python3-pip
# Allows run python3-coverage with same command than manual install by pip
RUN update-alternatives --install \
/usr/bin/python3-coverage \
coverage \
/usr/bin/python3.4-coverage \
1
RUN ln -s /etc/alternatives/coverage /usr/bin/coverage
FROM debian:stretch
RUN apt-get update
RUN apt-get upgrade -y
# Minimal tools required by acme-dns-tiny CI
RUN apt-get install -y \
python3-dnspython \
python3-coverage \
python3-configargparse \
python3-pip
# Allows run python3-coverage with same command than manual install by pip
RUN update-alternatives --install \
/usr/bin/coverage \
coverage \
/usr/bin/python3-coverage \
1
after_script:
- sleep 10
jessie:
image: adt-jessie_dnspython3_1.11
before_script:
- pip3 install --upgrade -r tests/requirements.txt
script:
- coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_delete
- coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_delete.py
- coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_deactivate
- coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_deactivate.py
- coverage html
jessie_backport:
......@@ -15,6 +12,18 @@ jessie_backport:
before_script:
- pip3 install --upgrade -r tests/requirements.txt
script:
- coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_delete
- coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_delete.py
- coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_deactivate
- coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_deactivate.py
- coverage html
stretch:
image: adt-stretch_dnspython3_1.15
before_script:
- pip3 install --upgrade -r tests/requirements.txt
script:
- coverage run --source ./ -m unittest -v tests.test_acme_dns_tiny tests.test_acme_account_rollover tests.test_acme_account_deactivate
- coverage report --include=acme_dns_tiny.py,tools/acme_account_rollover.py,tools/acme_account_deactivate.py
- coverage html
artifacts:
paths:
- htmlcov
......@@ -21,7 +21,7 @@ explains how to setup and test acme-tiny yourself.
## List of environment variables
* `GITLABCI_CAURL`: URL of a staging ACME server
* `GITLABCI_ACMEDIRECTORY_V2`: URL of a staging V2 ACME server
* `GITLABCI_CHALLENGEDELAY`: time to wait between dns update and self-check (set it to `0` to cover a bit more code)
* `GITLABCI_DNSHOST`: domain name to reach of your DNS server (e.g. `adorsaz.ch`)
* `GITLABCI_DNSHOSTIP`: IP address to reach of your DNS server
......
This diff is collapsed.
coverage
argparse
configparser
\ No newline at end of file
configparser
requests
import unittest, os, time
import acme_dns_tiny
from tests.config_factory import generate_acme_account_deactivate_config
import tools.acme_account_deactivate
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY_V2", "https://acme-staging-v02.api.letsencrypt.org/directory")
class TestACMEAccountDeactivate(unittest.TestCase):
"Tests for acme_account_deactivate"
@classmethod
def setUpClass(self):
self.configs = generate_acme_account_deactivate_config()
try:
acme_dns_tiny.main([self.configs['config']])
except ValueError as err:
if str(err).startswith("Error register"):
raise ValueError("Fail test as account has not been registered correctly: {0}".format(err))
super(TestACMEAccountDeactivate, self).setUpClass()
# To clean ACME staging server and close correctly temporary files
@classmethod
def tearDownClass(self):
# Remove temporary files
os.remove(self.configs['config'])
os.remove(self.configs['key'])
super(TestACMEAccountDeactivate, self).tearDownClass()
def test_success_account_deactivate(self):
""" Test success account key deactivate """
with self.assertLogs(level='INFO') as accountdeactivatelog:
tools.acme_account_deactivate.main(["--account-key", self.configs['key'],
"--acme-directory", ACMEDirectory])
self.assertIn("INFO:acme_account_deactivate:Account key deactivated !",
accountdeactivatelog.output)
if __name__ == "__main__":
unittest.main()
import unittest, os
import acme_dns_tiny
from tests.config_factory import generate_acme_account_delete_config
import tools.acme_account_delete
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
class TestACMEAccountDelete(unittest.TestCase):
"Tests for acme_account_delete"
@classmethod
def setUpClass(self):
self.accountkey = generate_acme_account_delete_config()
super(TestACMEAccountDelete, self).setUpClass()
# To clean ACME staging server and close correctly temporary files
@classmethod
def tearDownClass(self):
# close temp files correctly
self.accountkey.close()
super(TestACMEAccountDelete, self).tearDownClass()
def test_success_account_delete(self):
""" Test success account key delete """
with self.assertLogs(level='INFO') as accountdeletelog:
tools.acme_account_delete.main(["--account-key", self.accountkey.name,
"--acme-directory", ACMEDirectory])
self.assertIn("INFO:acme_account_delete:Account key deleted !",
accountdeletelog.output)
if __name__ == "__main__":
unittest.main()
import unittest, os
import unittest, os, time
import acme_dns_tiny
from tests.config_factory import generate_acme_account_rollover_config
from tools.acme_account_delete import account_delete
from tools.acme_account_deactivate import account_deactivate
import tools.acme_account_rollover
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY_V2", "https://acme-staging-v02.api.letsencrypt.org/directory")
class TestACMEAccountRollover(unittest.TestCase):
"Tests for acme_account_rollover"
......@@ -12,23 +12,24 @@ class TestACMEAccountRollover(unittest.TestCase):
@classmethod
def setUpClass(self):
self.configs = generate_acme_account_rollover_config()
acme_dns_tiny.main([self.configs['config']])
super(TestACMEAccountRollover, self).setUpClass()
# To clean ACME staging server and close correctly temporary files
@classmethod
def tearDownClass(self):
# delete account key registration at end of tests
account_delete(self.configs["newaccountkey"].name, ACMEDirectory)
# deactivate account key registration at end of tests
account_deactivate(self.configs["oldaccountkey"], ACMEDirectory)
# close temp files correctly
for tmpfile in self.configs:
self.configs[tmpfile].close()
os.remove(self.configs[tmpfile])
super(TestACMEAccountRollover, self).tearDownClass()
def test_success_account_rollover(self):
""" Test success account key rollover """
with self.assertLogs(level='INFO') as accountrolloverlog:
tools.acme_account_rollover.main(["--current", self.configs['oldaccountkey'].name,
"--new", self.configs['newaccountkey'].name,
tools.acme_account_rollover.main(["--current", self.configs['oldaccountkey'],
"--new", self.configs['newaccountkey'],
"--acme-directory", ACMEDirectory])
self.assertIn("INFO:acme_account_rollover:Account keys rolled over !",
accountrolloverlog.output)
......
import unittest, sys, os, subprocess
import unittest, sys, os, subprocess, time
from io import StringIO
import dns.version
import acme_dns_tiny
from tests.config_factory import generate_acme_dns_tiny_config
from tools.acme_account_delete import account_delete
from tools.acme_account_deactivate import account_deactivate
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY", "https://acme-staging.api.letsencrypt.org/directory")
ACMEDirectory = os.getenv("GITLABCI_ACMEDIRECTORY_V2", "https://acme-staging-v02.api.letsencrypt.org/directory")
class TestACMEDNSTiny(unittest.TestCase):
"Tests for acme_dns_tiny.get_crt()"
......@@ -22,11 +22,11 @@ class TestACMEDNSTiny(unittest.TestCase):
# To clean ACME staging server and close correctly temporary files
@classmethod
def tearDownClass(self):
# delete account key registration at end of tests
account_delete(self.configs["accountkey"].name, ACMEDirectory)
# deactivate account key registration at end of tests
account_deactivate(self.configs["accountkey"], ACMEDirectory)
# close temp files correctly
for tmpfile in self.configs:
self.configs[tmpfile].close()
os.remove(self.configs[tmpfile])
super(TestACMEDNSTiny, self).tearDownClass()
# helper function to run openssl command
......@@ -55,7 +55,7 @@ class TestACMEDNSTiny(unittest.TestCase):
old_stdout = sys.stdout
sys.stdout = StringIO()
acme_dns_tiny.main([self.configs['goodCName'].name])
acme_dns_tiny.main([self.configs['goodCName'], "--verbose"])
certchain = sys.stdout.getvalue()
sys.stdout.close()
......@@ -63,14 +63,40 @@ class TestACMEDNSTiny(unittest.TestCase):
self.assertCertificateChain(certchain)
def test_success_cn_with_csr_option(self):
""" Successfully issue a certificate using CSR option outside from the config file"""
old_stdout = sys.stdout
sys.stdout = StringIO()
acme_dns_tiny.main(["--csr", self.configs['cnameCSR'], self.configs['goodCNameWithoutCSR'], "--verbose"])
certchain = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = old_stdout
self.assertCertificateChain(certchain)
def test_success_wild_cn(self):
""" Successfully issue a certificate via a wildcard common name """
old_stdout = sys.stdout
sys.stdout = StringIO()
acme_dns_tiny.main([self.configs['wildCName'], "--verbose"])
certchain = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = old_stdout
self.assertCertificateChain(certchain)
def test_success_dnshost_ip(self):
""" When DNS Host is an IP, DNS resolution have to fail without error """
old_stdout = sys.stdout
sys.stdout = StringIO()
with self.assertLogs(level='INFO') as adnslog:
acme_dns_tiny.main([self.configs['dnsHostIP'].name])
self.assertIn("INFO:acme_dns_tiny:A and/or AAAA DNS resources not found for configured dns host: we will use either resource found if exists or directly the DNS Host configuration.",
acme_dns_tiny.main([self.configs['dnsHostIP'], "--verbose"])
self.assertIn("INFO:acme_dns_tiny:A and/or AAAA DNS resources not found for configured dns host: we will use either resource found if one exists or directly the DNS Host configuration.",
adnslog.output)
certchain = sys.stdout.getvalue()
......@@ -84,7 +110,7 @@ class TestACMEDNSTiny(unittest.TestCase):
old_stdout = sys.stdout
sys.stdout = StringIO()
acme_dns_tiny.main([self.configs['goodSAN'].name])
acme_dns_tiny.main([self.configs['goodSAN'], "--verbose"])
certchain = sys.stdout.getvalue()
sys.stdout.close()
......@@ -92,39 +118,62 @@ class TestACMEDNSTiny(unittest.TestCase):
self.assertCertificateChain(certchain)
def test_success_wildsan(self):
""" Successfully issue a certificate via wildcard in subject alt name """
old_stdout = sys.stdout
sys.stdout = StringIO()
acme_dns_tiny.main([self.configs['wildSAN']])
certchain = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = old_stdout
self.assertCertificateChain(certchain)
def test_success_cli(self):
""" Successfully issue a certificate via command line interface """
certout, err = subprocess.Popen([
"python3", "acme_dns_tiny.py", self.configs['goodCName'].name
"python3", "acme_dns_tiny.py", self.configs['goodCName'], "--verbose"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
certchain = certout.decode("utf8")
self.assertCertificateChain(certchain)
def test_success_cli_with_csr_option(self):
""" Successfully issue a certificate via command line interface using CSR option"""
certout, err = subprocess.Popen([
"python3", "acme_dns_tiny.py", "--csr", self.configs['cnameCSR'], self.configs['goodCNameWithoutCSR'], "--verbose"
], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
certchain = certout.decode("utf8")
self.assertCertificateChain(certchain)
def test_weak_key(self):
""" Let's Encrypt rejects weak keys """
self.assertRaisesRegex(ValueError,
"key too small",
acme_dns_tiny.main, [self.configs['weakKey'].name])
acme_dns_tiny.main, [self.configs['weakKey'], "--verbose"])
def test_account_key_domain(self):
""" Can't use the account key for the CSR """
self.assertRaisesRegex(ValueError,
"certificate public key must be different than account key",
acme_dns_tiny.main, [self.configs['accountAsDomain'].name])
acme_dns_tiny.main, [self.configs['accountAsDomain'], "--verbose"])
def test_failure_dns_update_tsigkeyname(self):
""" Fail to update DNS records by invalid TSIG Key name """
self.assertRaisesRegex(ValueError,
"Error updating DNS",
acme_dns_tiny.main, [self.configs['invalidTSIGName'].name])
acme_dns_tiny.main, [self.configs['invalidTSIGName'], "--verbose"])
def test_failure_notcompleted_configuration(self):
""" Configuration file have to be completed """
self.assertRaisesRegex(ValueError,
"Some required settings are missing\.",
acme_dns_tiny.main, [self.configs['missingDNS'].name])
acme_dns_tiny.main, [self.configs['missingDNS'], "--verbose"])
if __name__ == "__main__":
unittest.main()
import os, argparse, subprocess, json, base64, binascii, re, copy, logging
from urllib.request import urlopen
from urllib.error import HTTPError
#!/usr/bin/env python3
import sys, argparse, subprocess, json, base64, binascii, re, copy, logging, requests
LOGGER = logging.getLogger("acme_account_delete")
LOGGER = logging.getLogger("acme_account_deactivate")
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
def account_delete(accountkeypath, acme_directory, log=LOGGER):
def account_deactivate(accountkeypath, acme_directory, log=LOGGER):
# helper function base64 encode as defined in acme spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
......@@ -20,29 +18,44 @@ def account_delete(accountkeypath, acme_directory, log=LOGGER):
raise IOError("OpenSSL Error: {0}".format(err))
return out
# helper function make signed requests
# helper function to send signed requests
def _send_signed_request(url, payload):
nonlocal jws_nonce
payload64 = _b64(json.dumps(payload).encode("utf8"))
protected = copy.deepcopy(jws_header)
protected["nonce"] = jws_nonce or urlopen(acme_directory).getheader("Replay-Nonce", None)
protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce']
protected["url"] = url
if url == acme_config["newAccount"]:
del protected["kid"]
else:
del protected["jwk"]
protected64 = _b64(json.dumps(protected).encode("utf8"))
signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
"{0}.{1}".format(protected64, payload64).encode("utf8"))
data = json.dumps({
"header": jws_header, "protected": protected64,
"payload": payload64, "signature": _b64(signature),
})
jws = {
"protected": protected64, "payload": payload64, "signature": _b64(signature)
}
try:
resp = urlopen(url, data.encode("utf8"))
except HTTPError as httperror:
resp = httperror
resp = requests.post(url, json=jws, headers=joseheaders)
except requests.exceptions.RequestException as error:
resp = error.response
finally:
jws_nonce = resp.getheader("Replay-Nonce", None)
return resp.getcode(), resp.read(), resp.getheaders()
jws_nonce = resp.headers['Replay-Nonce']
if resp.text != '':
return resp.status_code, resp.json(), resp.headers
else:
return resp.status_code, json.dumps({}), resp.headers
# parse account key to get public key
log.info("Parsing account key...")
# main code
adtheaders = {'User-Agent': 'acme-dns-tiny/2.0'}
joseheaders = copy.deepcopy(adtheaders)
joseheaders['Content-Type'] = 'application/jose+json'
log.info("Fetch informations from the ACME directory.")
directory = requests.get(acme_directory, headers=adtheaders)
acme_config = directory.json()
log.info("Parsing account key.")
accountkey = _openssl("rsa", ["-in", accountkeypath, "-noout", "-text"])
pub_hex, pub_exp = re.search(
r"modulus:\r?\n\s+00:([a-f0-9\:\s]+?)\r?\npublicExponent: ([0-9]+)",
......@@ -56,59 +69,52 @@ def account_delete(accountkeypath, acme_directory, log=LOGGER):
"kty": "RSA",
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
},
"kid": None,
}
# get ACME server configuration from the directory
directory = urlopen(acme_directory)
acme_config = json.loads(directory.read().decode("utf8"))
jws_nonce = None
log.info("Register account to get account URL.")
code, result, headers = _send_signed_request(acme_config["new-reg"], {
"resource": "new-reg"
})
if code == 201:
account_url = dict(headers).get("Location")
log.info("Registered! (account: '{0}')".format(account_url))
elif code == 409:
account_url = dict(headers).get("Location")
log.info("Already registered! (account: '{0}')".format(account_url))
log.info("Delete account...")
code, result, headers = _send_signed_request(account_url, {
"resource": "reg",
"delete": True,
})
if code not in [200,202]:
raise ValueError("Error deleting account key: {0} {1}".format(code, result))
log.info("Account key deleted !")
log.info("Ask CA provider account url.")
account_request = {}
account_request["onlyReturnExisting"] = True
code, result, headers = _send_signed_request(acme_config["newAccount"], account_request)
if code == 200:
jws_header["kid"] = headers['Location']
else:
raise ValueError("Error looking or account URL: {0} {1}".format(code, result))
log.info("Deactivating account...")
code, result, headers = _send_signed_request(jws_header["kid"], {"status": "deactivated"})
if code == 200:
log.info("Account key deactivated !")
else:
raise ValueError("Error while deactivating the account key: {0} {1}".format(code, result))
def main(argv):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""
This script *deletes* your account from an ACME server.
description="Tiny ACME client to deactivate ACME account",
epilog="""This script permanently *deactivates* an ACME account.
You should revoke your certificates *before* using this script,
as the server won't accept any further request with this account.
It will need to have access to your private account key, so
PLEASE READ THROUGH IT!
It will need to access the ACME private account key, so PLEASE READ THROUGH IT!
It's around 150 lines, so it won't take long.
=== Example Usage ===
Remove account.key from staging Let's Encrypt:
python3 acme_account_delete.py --account-key account.key --acme-directory https://acme-staging.api.letsencrypt.org/directory
"""
Example: deactivate account.key from staging Let's Encrypt:
python3 acme_account_deactivate.py --account-key account.key --acme-directory https://acme-staging-v02.api.letsencrypt.org/directory"""
)
parser.add_argument("--account-key", required = True, help="path to the private account key to delete")
parser.add_argument("--acme-directory", required = True, help="ACME directory URL of the ACME server where to remove the key")
parser.add_argument("--account-key", required=True, help="path to the private account key to deactivate")
parser.add_argument("--acme-directory", required=True, help="ACME directory URL of the ACME server where to remove the key")
parser.add_argument("--quiet", action="store_const",
const=logging.ERROR,
help="suppress output except for errors")
args = parser.parse_args(argv)
LOGGER.setLevel(args.quiet or LOGGER.level)
account_delete(args.account_key, args.acme_directory)
LOGGER.setLevel(args.quiet or logging.INFO)
account_deactivate(args.account_key, args.acme_directory, log=LOGGER)
if __name__ == "__main__": # pragma: no cover
main(sys.argv[1:])
import os, argparse, subprocess, os, json, base64, binascii, hashlib, re, copy, logging
from urllib.request import urlopen
from urllib.error import HTTPError
#!/usr/bin/env python3
import sys, argparse, subprocess, json, base64, binascii, re, copy, logging, requests
LOGGER = logging.getLogger("acme_account_rollover")
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOGGER):
# helper function base64 encode as defined in acme spec
......@@ -35,34 +33,56 @@ def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOG
"kty": "RSA",
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
},
"kid": None
}
return jws_header
# helper function to sign request with specified key
def _sign_request(accountkeypath, jwsheader, payload):
# helper function to sign request with specified key path
def _sign_request(url, keypath, payload):
nonlocal jws_nonce
payload64 = _b64(json.dumps(payload).encode("utf8"))
protected = copy.deepcopy(jwsheader)
protected["nonce"] = jws_nonce or urlopen(acme_directory).getheader("Replay-Nonce", None)
if keypath == accountkeypath:
protected = copy.deepcopy(jws_header)
protected["nonce"] = jws_nonce or requests.get(acme_config["newNonce"]).headers['Replay-Nonce']
elif keypath == new_accountkeypath:
protected = copy.deepcopy(new_jws_header)
if (keypath == new_accountkeypath
or url == acme_config["newAccount"]):
del protected["kid"]
else:
del protected["jwk"]
protected["url"] = url
protected64 = _b64(json.dumps(protected).encode("utf8"))
signature = _openssl("dgst", ["-sha256", "-sign", accountkeypath],
signature = _openssl("dgst", ["-sha256", "-sign", keypath],
"{0}.{1}".format(protected64, payload64).encode("utf8"))
signedjws = {
"header": jwsheader, "protected": protected64,
"payload": payload64, "signature": _b64(signature),
"protected": protected64, "payload": payload64,"signature": _b64(signature)
}
return signedjws
# helper function make signed requests
def _send_signed_request(accountkeypath, jwsheader, url, payload):
data = json.dumps(_sign_request(accountkeypath, jwsheader, payload))
def _send_signed_request(url, keypath, payload):
nonlocal jws_nonce
jws = _sign_request(url, keypath, payload)
try:
resp = urlopen(url, data.encode("utf8"))
except HTTPError as httperror:
resp = httperror
resp = requests.post(url, json=jws, headers=joseheaders)
except requests.exceptions.RequestException as error:
resp = error.response
finally:
jws_nonce = resp.getheader("Replay-Nonce", None)
return resp.getcode(), resp.read(), resp.getheaders()
jws_nonce = resp.headers['Replay-Nonce']
if resp.text != '':
return resp.status_code, resp.json(), resp.headers
else:
return resp.status_code, json.dumps({}), resp.headers
# main code
adtheaders = {'User-Agent': 'acme-dns-tiny/2.0'}
joseheaders=copy.deepcopy(adtheaders)
joseheaders['Content-Type']='application/jose+json'
log.info("Fetch informations from the ACME directory.")
directory = requests.get(acme_directory, headers=adtheaders)
acme_config = directory.json()
log.info("Parsing current account key...")
jws_header = _jws_header(accountkeypath)
......@@ -70,27 +90,21 @@ def account_rollover(accountkeypath, new_accountkeypath, acme_directory, log=LOG
log.info("Parsing new account key...")
new_jws_header = _jws_header(new_accountkeypath)