* added read service (kavita) * renamed script to match repo bugs: * chown keys after custom site scripts in case of pfx or other generation (this was not getting chowned before and breaking the site)
357 lines
12 KiB
Python
Executable File
357 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Automation for cert renewal.
|
|
|
|
assumptions:
|
|
* firewall has access configured for specified key
|
|
* firewall sshd config contains: `AcceptEnv ssl_service state`
|
|
* firewall has `ssl-update.sh` copied to /usr/local/bin and chmod +x
|
|
"""
|
|
|
|
import datetime
|
|
import logging
|
|
import getpass
|
|
import os
|
|
import pathlib
|
|
import pexpect
|
|
import pwd
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
supported_services = [
|
|
'git', 'plex', 'jellyfin', 'photoprism', 'nextcloud', 'read']
|
|
|
|
restart_delay = {
|
|
'plex': 10
|
|
}
|
|
|
|
pfx_key_path = {
|
|
'plex': '/data/plex/certs/certificate.pfx',
|
|
'jellyfin': '/etc/letsencrypt/live/jellyfin.drheck.dev/jellyfin.pfx',
|
|
}
|
|
|
|
# Cert owning user if different than the name of the service
|
|
users = {
|
|
'git': 'gitea',
|
|
'read': 'http',
|
|
}
|
|
|
|
# systemd service names that don't match the service name
|
|
# service : systemd_service
|
|
systemd_services = {
|
|
'git': 'gitea',
|
|
'plex': 'plexmediaserver',
|
|
'read': 'kavita',
|
|
}
|
|
|
|
cert_files = ['privkey1.pem', 'fullchain1.pem', 'chain1.pem', 'cert1.pem']
|
|
|
|
router = 'danknasty'
|
|
router_user = 'luke131'
|
|
router_key = '/root/.ssh/id_autofirewall'
|
|
server_user = 'luke'
|
|
|
|
|
|
def firewall_mod(state, service, decrypt_pp):
|
|
os.environ['state'] = 'HTTP_UP'
|
|
os.environ['ssl_service'] = service
|
|
cmd = ['/usr/bin/ssh', '-i', router_key, '-o',
|
|
'SendEnv=state', '-o', 'SendEnv=ssl_service', '-l', router_user,
|
|
router, 'doas', '-n', '/usr/local/bin/ssl-update.sh']
|
|
log.info(f'env for fw: state: {os.environ["state"]}')
|
|
log.info(f'env for fw: ssl_service: {service}')
|
|
log.info(f'cmd to connect to firewall: "{" ".join(cmd)}"')
|
|
p = pexpect.spawnu(' '.join(cmd))
|
|
p.logfile = sys.stderr
|
|
log.info(f'key string: {router_key}')
|
|
res = p.expect([f'''Enter passphrase for key ['"]{router_key}['"]:''',
|
|
pexpect.TIMEOUT, pexpect.EOF])
|
|
if res > 0:
|
|
sys.exit('Couldnt send decryption key to ssh.')
|
|
p.sendline(decrypt_pp)
|
|
res = p.expect(['success', pexpect.TIMEOUT, pexpect.EOF])
|
|
if res > 0:
|
|
sys.exit(f'Failed. error: {p.before}')
|
|
state_print = state.split('_')[1].lower()
|
|
log.info(f'Turned {state_print} HTTP for {service}')
|
|
|
|
|
|
def get_cert_dates(url, port=443):
|
|
cmd = (
|
|
f'printf "" | /usr/bin/openssl s_client -servername {url} -connect '
|
|
f'{url}:{port} | openssl x509 -noout -dates')
|
|
|
|
ps = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
bstdout, bstderr = ps.communicate()
|
|
exit_code = ps.returncode
|
|
stdout = bstdout.decode('UTF-8').splitlines()
|
|
stderr = bstderr.decode('UTF-8')
|
|
if exit_code > 0:
|
|
sys.exit(f'Error checking state of SSL cert at {url}:{port}: {stderr}')
|
|
start_str = ''
|
|
finish_str = ''
|
|
for line in stdout:
|
|
if 'notBefore' in line:
|
|
start_str = line.split('=')[1]
|
|
if 'notAfter' in line:
|
|
finish_str = line.split('=')[1]
|
|
|
|
ts = datetime.datetime.now()
|
|
start = datetime.datetime.strptime(start_str, '%b %d %H:%M:%S %Y %Z')
|
|
finish = datetime.datetime.strptime(finish_str, '%b %d %H:%M:%S %Y %Z')
|
|
if ts > start and ts < finish:
|
|
print('Cert is valid')
|
|
else:
|
|
print('Cert is not valid')
|
|
print(f'start: {start}\nnow: {ts}\nfinish: {finish}')
|
|
|
|
|
|
def recurse_rmdir(directory):
|
|
directory = pathlib.Path(directory)
|
|
for item in directory.iterdir():
|
|
if item.is_dir():
|
|
recurse_rmdir(item)
|
|
else:
|
|
item.unlink()
|
|
directory.rmdir()
|
|
|
|
|
|
def jellyfin():
|
|
log.info('custom function for jellyfin actions')
|
|
pfx_gen('jellyfin')
|
|
|
|
|
|
def plex():
|
|
log.info('custom function for plex actions')
|
|
pfx_gen('plex')
|
|
|
|
|
|
def restart(service):
|
|
if service in restart_delay:
|
|
wait = restart_delay[service]
|
|
else:
|
|
wait = 5
|
|
|
|
try:
|
|
systemd_service = systemd_services[service]
|
|
except KeyError:
|
|
systemd_service = service
|
|
log.info(f'going to restart service: {service}')
|
|
cmd = ['/usr/bin/systemctl', 'restart', systemd_service]
|
|
log.info(f'cmd to restart service: "{" ".join(cmd)}"')
|
|
p = subprocess.run(cmd, capture_output=True)
|
|
log.info(f'sleeping {wait} seconds')
|
|
time.sleep(wait)
|
|
|
|
cmd = ['/usr/bin/systemctl', 'is-active', '--quiet', systemd_service]
|
|
log.info(f'cmd to check service status: "{" ".join(cmd)}"')
|
|
p = subprocess.run(cmd)
|
|
if p.returncode != 0:
|
|
log.error(f'{service} failed to restart:')
|
|
cmd = ['/usr/bin/systemctl', 'status', systemd_service]
|
|
log.info(f'cmd to show status of service: "{" ".join(cmd)}"')
|
|
p = subprocess.run(cmd, capture_output=True)
|
|
stderr = p.stderr.decode('UTF-8')
|
|
sys.stderr.write(stderr)
|
|
sys.exit(1)
|
|
log.info(f'{service} has restarted OK')
|
|
|
|
|
|
def pfx_gen(service):
|
|
log.info(f'generate pfx file for {service}')
|
|
cmd = ['/usr/bin/su', '-l', server_user,
|
|
'/usr/bin/pass', 'show', f'ssl/{service}']
|
|
log.info(f'cmd to get password to encrypt private key: "{" ".join(cmd)}"')
|
|
p = subprocess.run(cmd, capture_output=True)
|
|
export_pw = p.stdout.decode('UTF-8').strip()
|
|
if not export_pw:
|
|
sys.exit(f'Couldnt get ssl export password for {service}')
|
|
try:
|
|
pkp = pfx_key_path[service]
|
|
except KeyError:
|
|
sys.exit(f'{service} has no defined private key path.')
|
|
|
|
cmd = ['/usr/bin/openssl', 'pkcs12', '-export', '-out', pkp,
|
|
'-inkey', f'/etc/letsencrypt/live/{service}.drheck.dev/privkey.pem',
|
|
'-in', f'/etc/letsencrypt/live/{service}.drheck.dev/cert.pem',
|
|
'-certfile', f'/etc/letsencrypt/live/{service}.drheck.dev/chain.pem']
|
|
log.info(f'cmd to encrypt private key: "{" ".join(cmd)}"')
|
|
p = pexpect.spawnu(' '.join(cmd))
|
|
p.logfile = sys.stderr
|
|
res = p.expect(['Enter Export Password:', pexpect.EOF, pexpect.TIMEOUT])
|
|
if res > 0:
|
|
sys.exit('Failed to run openssl to generate '
|
|
f'pkcs12 keys for {service}: {p.before}')
|
|
p.sendline(export_pw)
|
|
log.info(f'send password to encrypt private key')
|
|
res = p.expect(['Verifying - Enter Export Password:',
|
|
pexpect.EOF, pexpect.TIMEOUT])
|
|
if res > 0:
|
|
sys.exit('Failed to run openssl to generate '
|
|
f'pkcs12 keys for {service}: {p.before}')
|
|
p.sendline(export_pw)
|
|
log.info(f'send password to encrypt private key...again')
|
|
res = p.expect([pexpect.EOF, pexpect.TIMEOUT])
|
|
if res > 0:
|
|
sys.exit(f'Failed to run openssl to generate '
|
|
'pkcs12 keys for {service}: {p.before}')
|
|
log.info(f'this did not explicitly fail')
|
|
|
|
|
|
def main(args):
|
|
# logging.basicConfig(level=os.environ.get("LOGLEVEL", "WARNING"))
|
|
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
|
|
|
|
log.info(f'program start: {sys.argv}')
|
|
if len(args) != 1:
|
|
sys.exit(f'Give a service to renew: {", ".join(supported_services)} ')
|
|
service = args[0]
|
|
if service not in supported_services:
|
|
sys.exit(f'Give a service to renew: {", ".join(supported_services)} ')
|
|
|
|
uid = os.getuid()
|
|
if uid != 0:
|
|
sys.exit('Run as root')
|
|
|
|
log.info('Get SSH decryption pw')
|
|
cmd = ['/usr/bin/su', '-l', server_user,
|
|
'/usr/bin/pass', 'show', 'ssh/autofirewall']
|
|
log.info(f'cmd: "{" ".join(cmd)}"')
|
|
p = subprocess.run(cmd, capture_output=True)
|
|
decrypt_pp = p.stdout.decode('UTF-8').strip()
|
|
if not decrypt_pp:
|
|
sys.exit('Could not get decryption passpharase')
|
|
log.info('Got SSH decryption pw')
|
|
|
|
challenge_path = pathlib.Path(
|
|
'/usr/share/nginx/html/.well-known/acme-challenge/')
|
|
if challenge_path.is_dir():
|
|
recurse_rmdir(challenge_path)
|
|
log.info('Challenge path deleted')
|
|
challenge_path.mkdir()
|
|
log.info('Challenge path created')
|
|
challenge_path.chmod(0o755)
|
|
log.info('Challenge path chmodded')
|
|
|
|
fqdn = f'{service}.drheck.dev'
|
|
log.info(f'fqdn: {fqdn}')
|
|
# Dry run:
|
|
# cmd = ['/usr/bin/certbot', '--dry-run', 'certonly', '--manual', '-d', fqdn]
|
|
# Real run:
|
|
cmd = ['/usr/bin/certbot', 'certonly', '--manual', '-d', fqdn]
|
|
log.info(f'certbot cmd: "{" ".join(cmd)}"')
|
|
cb = pexpect.spawnu(' '.join(cmd))
|
|
res = cb.expect(
|
|
['Create a file containing just this data:\r\n\r\n([^\r]+)\r',
|
|
('You have an existing certificate that has exactly the '
|
|
"same domains or certificate name you requested and isn't "
|
|
'close to expiry'), pexpect.TIMEOUT, pexpect.EOF],
|
|
timeout=20)
|
|
if res > 1:
|
|
sys.exit('Timed out')
|
|
if res == 1:
|
|
log.info('Current cert is not yet expired')
|
|
res = cb.expect_exact(['cancel):', pexpect.TIMEOUT, pexpect.EOF])
|
|
if res > 0:
|
|
sys.exit('Timed out in setup with existing cert')
|
|
cb.sendline('2')
|
|
res = cb.expect(
|
|
['Create a file containing just this data:\r\n\r\n([^\r]+)\r',
|
|
pexpect.TIMEOUT, pexpect.EOF], timeout=20)
|
|
if res > 1:
|
|
sys.exit('Timed out')
|
|
|
|
data = cb.match.group(1)
|
|
log.info(f'secret data: {data}')
|
|
log.info('the data string and location for the shared secret are known')
|
|
long_match = ('And make it available on your web server at this URL:'
|
|
'\r\n\r\nhttp://%s/.well-known/acme-challenge/([^\r]+)\r')
|
|
res = cb.expect([long_match % (fqdn,), pexpect.TIMEOUT, pexpect.EOF])
|
|
if res > 0:
|
|
sys.exit('Timed out')
|
|
filename = cb.match.group(1)
|
|
log.info(f'filename of secret: {filename}')
|
|
res = cb.expect(['Press Enter to Continue', pexpect.EOF], timeout=0)
|
|
|
|
data_file = challenge_path / pathlib.Path(filename)
|
|
try:
|
|
with open(data_file, 'w') as f:
|
|
f.write(data)
|
|
except:
|
|
sys.exit(f'Failed to write {data_file}')
|
|
log.info('created secret file with secret data')
|
|
data_file.chmod(0o644)
|
|
log.info('and chmodded')
|
|
|
|
symlink_name = pathlib.Path(f'{service}-le')
|
|
log.info(f'nginx symlink: {symlink_name}')
|
|
nx_conf = pathlib.Path('/etc/nginx')
|
|
avail_path = nx_conf / pathlib.Path('sites-available')
|
|
enabled_path = nx_conf / pathlib.Path('sites-enabled')
|
|
|
|
service_available_file = avail_path / symlink_name
|
|
log.info(f'nginx service avail symlink: {service_available_file}')
|
|
service_enabled_symlink = enabled_path / symlink_name
|
|
log.info(f'nginx service enabled symlink: {service_enabled_symlink}')
|
|
if not service_enabled_symlink.is_symlink():
|
|
service_enabled_symlink.symlink_to(service_available_file)
|
|
log.info('created symlink to enable service')
|
|
|
|
log.info(f'open port 80 to {service}')
|
|
firewall_mod('HTTP_UP', service, decrypt_pp)
|
|
restart('nginx')
|
|
|
|
cb.sendline()
|
|
log.info(f'sent <enter> to certbot to continue process')
|
|
res = cb.expect([pexpect.EOF])
|
|
log.info(f'cerbot completed. final output: {cb.before}')
|
|
|
|
if 'failed' in cb.before:
|
|
sys.exit('Something went wrong')
|
|
|
|
log.info(f'open port 80 to {service}')
|
|
firewall_mod('HTTP_DOWN', service, decrypt_pp)
|
|
|
|
service_enabled_symlink.unlink()
|
|
log.info('removed symlink in nginx to disable HTTP')
|
|
|
|
restart('nginx')
|
|
|
|
key_path = pathlib.Path('/etc/letsencrypt')
|
|
live = key_path / pathlib.Path(f'live/{service}.drheck.dev')
|
|
archive = key_path / pathlib.Path(f'archive/{service}.drheck.dev')
|
|
log.info(f'live keypath: {live}')
|
|
log.info(f'archive keypath: {archive}')
|
|
|
|
if service in globals():
|
|
log.info(f'{service} has a service specific function to run')
|
|
eval(f'{service}()')
|
|
log.info(f'{service} specific work complete.')
|
|
|
|
user = service
|
|
if service in users:
|
|
user = users[service]
|
|
log.info(f'service {service} has user {user}')
|
|
uid = pwd.getpwnam(user).pw_uid
|
|
gid = pwd.getpwnam(user).pw_gid
|
|
|
|
# chown after custom service in case pfx or other key is generated
|
|
|
|
log.info(f'uid: {uid} gid: {gid}')
|
|
os.chown(live, uid, gid)
|
|
log.info(f'live keypath chmodded')
|
|
os.chown(archive, uid, gid)
|
|
log.info(f'archive keypath chmodded')
|
|
for cert_file in cert_files:
|
|
os.chown(archive / pathlib.Path(cert_file), uid, gid)
|
|
log.info(f'{cert_file} chowned to service user')
|
|
log.info(f'chmodded new keys from certbot')
|
|
|
|
restart(service)
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
if __name__ == '__main__':
|
|
main(sys.argv[1:])
|