ssl-update/ssl-update.py

385 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
# pylint: disable=W1309 W1203 W1510 C0116 W1401 C0103 R1715 R0914
"""Automation for cert renewal.
assumptions:
* firewall has access configured for specified key
* firewall sshd config contains: `AcceptEnv ssl_service state`
* firewall has `ssl-update.sh` copied to /usr/local/bin and chmod +x
"""
import logging
import os
import pathlib
import pwd
import subprocess
import sys
import time
import pexpect
firewall = '10.0.0.1'
firewall_user = 'luke'
firewall_key = '/root/.ssh/id_autofirewall'
server_user = 'luke'
domain = 'drheck.dev'
supported_services = [
'chat',
'git',
'jellyfin',
'nextcloud',
'photoprism',
'plex',
'read',
'sync',
'www',
]
restart_delay = {
'plex': 10
}
pfx_key_path = {
'plex': '/data/plex/certs/certificate.pfx',
'jellyfin': '/data/jellyfin/ssl/jellyfin.pfx',
}
# Cert owning user if different than the name of the service
users = {
'git': 'gitea',
'read': 'http',
'chat': 'synapse',
'sync': 'syncv3',
'www': 'http',
'spacebar': '_spacebar',
}
# systemd service names that don't match the service name
# service : systemd_service
systemd_services = {
'git': 'gitea',
'plex': 'plexmediaserver',
'read': 'kavita',
'nextcloud': 'php-fpm',
'chat': 'synapse',
'sync': 'sliding-sync',
'www': 'nginx',
}
remote = {
'rng': {
'user': 'luke',
'auth': 'id_m',
},
}
def firewall_mod(state, service, decrypt_pp):
os.environ['state'] = 'HTTP_UP'
os.environ['ssl_service'] = service
cmd = ['/usr/bin/ssh', '-i', firewall_key, '-o',
'SendEnv=state', '-o', 'SendEnv=ssl_service', '-l', firewall_user,
firewall, '/usr/local/bin/ssl-update.sh']
log.info(f'env for fw: state: {os.environ["state"]}')
log.info(f'env for fw: ssl_service: {service}')
log.info(f'cmd to connect to firewall: "{" ".join(cmd)}"')
p = pexpect.spawnu(' '.join(cmd))
p.logfile = sys.stderr
log.info(f'key string: {firewall_key}')
res = p.expect([f'''Enter passphrase for key ['"]{firewall_key}['"]:''',
pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit('Couldnt send decryption key to ssh.')
p.sendline(decrypt_pp)
res = p.expect(['success', pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit(f'Failed. error: {p.before}')
state_print = state.split('_')[1].lower()
log.info(f'Turned {state_print} HTTP for {service}')
def recurse_rmdir(directory):
directory = pathlib.Path(directory)
for item in directory.iterdir():
if item.is_dir():
recurse_rmdir(item)
else:
item.unlink()
directory.rmdir()
def jellyfin():
log.info('custom function for jellyfin actions')
pfx_gen('jellyfin')
def plex():
log.info('custom function for plex actions')
pfx_gen('plex')
def restart(service):
if service in restart_delay:
wait = restart_delay[service]
else:
wait = 5
try:
systemd_service = systemd_services[service]
except KeyError:
systemd_service = service
log.info(f'going to restart service: {service}')
cmd = ['/usr/bin/systemctl', 'restart', systemd_service]
log.info(f'cmd to restart service: "{" ".join(cmd)}"')
p = subprocess.run(cmd, capture_output=True)
log.info(f'sleeping {wait} seconds')
time.sleep(wait)
cmd = ['/usr/bin/systemctl', 'is-active', '--quiet', systemd_service]
log.info(f'cmd to check service status: "{" ".join(cmd)}"')
p = subprocess.run(cmd)
if p.returncode != 0:
log.error(f'{service} failed to restart:')
cmd = ['/usr/bin/systemctl', 'status', systemd_service]
log.info(f'cmd to show status of service: "{" ".join(cmd)}"')
p = subprocess.run(cmd, capture_output=True)
stderr = p.stderr.decode('UTF-8')
sys.stderr.write(stderr)
sys.exit(1)
log.info(f'{service} has restarted OK')
def pfx_gen(service):
log.info(f'generate pfx file for {service}')
cmd = ['/usr/bin/su', '-l', server_user,
'/usr/bin/pass', 'show', f'ssl/{service}']
log.info(f'cmd to get password to encrypt private key: "{" ".join(cmd)}"')
p = subprocess.run(cmd, capture_output=True)
export_pw = p.stdout.decode('UTF-8').strip()
if not export_pw:
sys.exit(f'Couldnt get ssl export password for {service}')
try:
pkp = pfx_key_path[service]
except KeyError:
sys.exit(f'{service} has no defined private key path.')
cmd = ['/usr/bin/openssl', 'pkcs12', '-export', '-out', pkp,
'-inkey', f'/etc/letsencrypt/live/{service}.{domain}/privkey.pem',
'-in', f'/etc/letsencrypt/live/{service}.{domain}/cert.pem',
'-certfile', f'/etc/letsencrypt/live/{service}.{domain}/chain.pem']
user = service
if service in users:
user = users[service]
uid = pwd.getpwnam(user).pw_uid
gid = pwd.getpwnam(user).pw_gid
# chown after custom service in case pfx or other key is generated
log.info(f'changing owner of {pkp} - uid: {uid} gid: {gid}')
os.chown(pkp, uid, gid)
log.info(f'chmod {pkp} to 0o600')
pkp.chmod(0o600)
log.info(f'cmd to encrypt private key: "{" ".join(cmd)}"')
p = pexpect.spawnu(' '.join(cmd))
p.logfile = sys.stderr
res = p.expect(['Enter Export Password:', pexpect.EOF, pexpect.TIMEOUT])
if res > 0:
sys.exit('Failed to run openssl to generate '
f'pkcs12 keys for {service}: {p.before}')
p.sendline(export_pw)
log.info('send password to encrypt private key')
res = p.expect(['Verifying - Enter Export Password:',
pexpect.EOF, pexpect.TIMEOUT])
if res > 0:
sys.exit('Failed to run openssl to generate '
f'pkcs12 keys for {service}: {p.before}')
p.sendline(export_pw)
log.info(f'send password to encrypt private key...again')
res = p.expect([pexpect.EOF, pexpect.TIMEOUT])
if res > 0:
sys.exit(f'Failed to run openssl to generate '
'pkcs12 keys for {service}: {p.before}')
log.info(f'this did not explicitly fail')
def get_ssh_pw():
log.info('Get SSH decryption pw')
cmd = ['/usr/bin/su', '-l', server_user,
'/usr/bin/pass', 'show', 'ssh/autofirewall']
log.info(f'cmd: "{" ".join(cmd)}"')
p = subprocess.run(cmd, capture_output=True)
decrypt_pp = p.stdout.decode('UTF-8').strip()
if not decrypt_pp:
sys.exit('Could not get decryption passpharase')
log.info('Got SSH decryption pw')
return decrypt_pp
def reset_challenge_path():
challenge_path = pathlib.Path(
'/usr/share/nginx/html/.well-known/acme-challenge/')
if challenge_path.is_dir():
recurse_rmdir(challenge_path)
log.info('Challenge path deleted')
challenge_path.mkdir()
log.info('Challenge path created')
challenge_path.chmod(0o755)
log.info('Challenge path chmodded')
return challenge_path
def run_cert_bot(fqdn, service, challenge_path, decrypt_pp):
cmd = ['/usr/bin/certbot', 'certonly', '--manual', '-d', fqdn]
log.info(f'certbot cmd: "{" ".join(cmd)}"')
cb = pexpect.spawnu(' '.join(cmd))
cb.logfile = sys.stderr
while True:
res = cb.expect(
['Create a file containing just this data:\r\n\r\n([^\r]+)\r',
('You have an existing certificate that has exactly the '
"same domains or certificate name you requested and isn't "
'close to expiry'),'(U)pdate key type/(K)eep existing key type:',
pexpect.TIMEOUT, pexpect.EOF], timeout=20)
if res > 2:
sys.exit('Timed out')
if res == 2:
cb.sendline('U')
continue
if res == 1:
log.info('Current cert is not yet expired')
res = cb.expect_exact(['cancel):', pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit('Timed out in setup with existing cert')
cb.sendline('2')
res = cb.expect(
['Create a file containing just this data:\r\n\r\n([^\r]+)\r',
pexpect.TIMEOUT, pexpect.EOF], timeout=20)
if res > 1:
sys.exit('Timed out')
if res == 0:
break
data = cb.match.group(1)
log.info(f'secret data: {data}')
log.info('the data string and location for the shared secret are known')
long_match = ('And make it available on your web server at this URL:'
'\r\n\r\nhttp://%s/.well-known/acme-challenge/([^\r]+)\r')
res = cb.expect([long_match % (fqdn,), pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit('Timed out')
filename = cb.match.group(1)
log.info(f'filename of secret: {filename}')
res = cb.expect(['Press Enter to Continue', pexpect.EOF], timeout=0)
data_file = challenge_path / pathlib.Path(filename)
try:
with open(data_file, 'w') as f:
f.write(data)
except:
sys.exit(f'Failed to write {data_file}')
log.info('created secret file with secret data')
data_file.chmod(0o644)
log.info('and chmodded')
symlink_name = pathlib.Path(f'{service}-le')
log.info(f'nginx symlink: {symlink_name}')
nx_conf = pathlib.Path('/etc/nginx')
avail_path = nx_conf / pathlib.Path('sites-available')
enabled_path = nx_conf / pathlib.Path('sites-enabled')
service_available_file = avail_path / symlink_name
log.info(f'nginx service avail symlink: {service_available_file}')
service_enabled_symlink = enabled_path / symlink_name
log.info(f'nginx service enabled symlink: {service_enabled_symlink}')
if not service_enabled_symlink.is_symlink():
service_enabled_symlink.symlink_to(service_available_file)
log.info('created symlink to enable service')
log.info(f'open port 80 to {service}')
firewall_mod('HTTP_UP', service, decrypt_pp)
restart('nginx')
cb.sendline()
log.info(f'sent <enter> to certbot to continue process')
res = cb.expect([pexpect.EOF])
log.info(f'cerbot completed. final output: {cb.before}')
output_text = str(cb.before)
if 'failed' in output_text:
sys.exit('Something went wrong')
log.info(f'close port 80 to {service}')
firewall_mod('HTTP_DOWN', service, decrypt_pp)
service_enabled_symlink.unlink()
if service_enabled_symlink.is_symlink():
sys.exit(f'Could not unlink {service_enabled_symlink}')
log.info('created symlink to enable service')
log.info('removed symlink in nginx to disable HTTP')
restart('nginx')
key_path = pathlib.Path('/etc/letsencrypt')
live = key_path / pathlib.Path(f'live/{service}.{domain}')
archive = key_path / pathlib.Path(f'archive/{service}.{domain}')
log.info(f'live keypath: {live}')
log.info(f'archive keypath: {archive}')
if service in globals():
log.info(f'{service} has a service specific function to run')
eval(f'{service}()')
log.info(f'{service} specific work complete.')
user = service
if service in users:
user = users[service]
log.info(f'service {service} has user {user}')
uid = pwd.getpwnam(user).pw_uid
gid = pwd.getpwnam(user).pw_gid
# chown after custom service in case pfx or other key is generated
log.info(f'uid: {uid} gid: {gid}')
os.chown(live, uid, gid)
log.info(f'live keypath chmodded')
os.chown(archive, uid, gid)
log.info(f'archive keypath chmodded')
for dirpath, _, filenames in os.walk(archive):
os.chown(dirpath, uid, gid)
for filename in filenames:
os.chown(os.path.join(dirpath, filename), uid, gid)
log.info(f'chmodded new keys from certbot')
def main(args):
logging.basicConfig(level=os.environ.get("LOGLEVEL", "WARNING"))
#logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
log.info(f'program start: {sys.argv}')
if len(args) != 1:
sys.exit(f'Give a service to renew: {", ".join(supported_services)} ')
service = args[0]
if service not in supported_services:
sys.exit(f'Give a service to renew: {", ".join(supported_services)} ')
uid = os.getuid()
if uid != 0:
sys.exit('Run as root')
decrypt_pp = get_ssh_pw()
challenge_path = reset_challenge_path()
fqdn = f'{service}.{domain}'
log.info(f'fqdn: {fqdn}')
# Dry run:
# cmd = ['/usr/bin/certbot', '--dry-run', 'certonly', '--manual', '-d', fqdn]
# Real run:
run_cert_bot(fqdn, service, challenge_path, decrypt_pp)
restart(service)
log = logging.getLogger(__name__)
if __name__ == '__main__':
main(sys.argv[1:])