#!/usr/bin/env python3 # pylint: disable=W1309 W1203 W1510 C0116 W1401 C0103 R1715 R0914 """Automation for cert renewal. assumptions: * firewall has access configured for specified key * firewall sshd config contains: `AcceptEnv ssl_service state` * firewall has `ssl-update.sh` copied to /usr/local/bin and chmod +x """ import logging import os import pathlib import pwd import subprocess import sys import time import pexpect firewall = '10.0.0.1' firewall_user = 'luke' firewall_key = '/root/.ssh/id_autofirewall' server_user = 'luke' domain = 'drheck.dev' supported_services = [ 'git', 'photoprism', 'plex', 'read', 'www', ] restart_delay = { 'plex': 10 } pfx_key_path = { 'plex': pathlib.Path('/data/plex/certs/certificate.pfx'), 'jellyfin': pathlib.Path('/data/jellyfin/ssl/jellyfin.pfx'), } # Cert owning user if different than the name of the service users = { 'git': 'gitea', 'read': 'http', 'chat': 'synapse', 'sync': 'syncv3', 'www': 'http', 'spacebar': '_spacebar', } # systemd service names that don't match the service name # service : systemd_service systemd_services = { 'git': 'gitea', 'plex': 'plexmediaserver', 'read': 'kavita', 'nextcloud': 'php-fpm', 'chat': 'synapse', 'sync': 'sliding-sync', 'www': 'nginx', } remote = { 'rng': { 'user': 'luke', 'auth': 'id_m', }, } def firewall_mod(state, service, decrypt_pp): os.environ['state'] = state os.environ['ssl_service'] = service cmd = ['/usr/bin/ssh', '-i', firewall_key, '-o', 'SendEnv=state', '-o', 'SendEnv=ssl_service', '-l', firewall_user, firewall, '/usr/local/bin/ssl-update.sh'] log.info(f'env for fw: state: {os.environ["state"]}') log.info(f'env for fw: ssl_service: {service}') log.info(f'cmd to connect to firewall: "{" ".join(cmd)}"') p = pexpect.spawnu(' '.join(cmd)) p.logfile = sys.stderr log.info(f'key string: {firewall_key}') res = p.expect([f'''Enter passphrase for key ['"]{firewall_key}['"]:''', pexpect.TIMEOUT, pexpect.EOF]) if res > 0: sys.exit('Couldnt send decryption key to ssh.') p.sendline(decrypt_pp) res = p.expect(['success', pexpect.TIMEOUT, pexpect.EOF]) if res > 0: sys.exit(f'Failed. error: {p.before}') state_print = state.split('_')[1].lower() log.info(f'Turned {state_print} HTTP for {service}') def recurse_rmdir(directory): directory = pathlib.Path(directory) for item in directory.iterdir(): if item.is_dir(): recurse_rmdir(item) else: item.unlink() directory.rmdir() def jellyfin(): log.info('custom function for jellyfin actions') pfx_gen('jellyfin') def plex(): log.info('custom function for plex actions') pfx_gen('plex') def restart(service): if service in restart_delay: wait = restart_delay[service] else: wait = 5 try: systemd_service = systemd_services[service] except KeyError: systemd_service = service log.info(f'going to restart service: {service}') cmd = ['/usr/bin/systemctl', 'restart', systemd_service] log.info(f'cmd to restart service: "{" ".join(cmd)}"') p = subprocess.run(cmd, capture_output=True) log.info(f'sleeping {wait} seconds') time.sleep(wait) cmd = ['/usr/bin/systemctl', 'is-active', '--quiet', systemd_service] log.info(f'cmd to check service status: "{" ".join(cmd)}"') p = subprocess.run(cmd) if p.returncode != 0: log.error(f'{service} failed to restart:') cmd = ['/usr/bin/systemctl', 'status', systemd_service] log.info(f'cmd to show status of service: "{" ".join(cmd)}"') p = subprocess.run(cmd, capture_output=True) stderr = p.stderr.decode('UTF-8') sys.stderr.write(stderr) sys.exit(1) log.info(f'{service} has restarted OK') def pfx_gen(service): log.info(f'generate pfx file for {service}') cmd = ['/usr/bin/su', '-l', server_user, '/usr/bin/pass', 'show', f'ssl/{service}'] log.info(f'cmd to get password to encrypt private key: "{" ".join(cmd)}"') p = subprocess.run(cmd, capture_output=True) export_pw = p.stdout.decode('UTF-8').strip() if not export_pw: sys.exit(f'Couldnt get ssl export password for {service}') try: pkp = pfx_key_path[service] except KeyError: sys.exit(f'{service} has no defined private key path.') cmd = ['/usr/bin/openssl', 'pkcs12', '-export', '-out', str(pkp), '-inkey', f'/etc/letsencrypt/live/{service}.{domain}/privkey.pem', '-in', f'/etc/letsencrypt/live/{service}.{domain}/cert.pem', '-certfile', f'/etc/letsencrypt/live/{service}.{domain}/chain.pem'] user = service if service in users: user = users[service] uid = pwd.getpwnam(user).pw_uid gid = pwd.getpwnam(user).pw_gid # chown after custom service in case pfx or other key is generated log.info(f'changing owner of {pkp} - uid: {uid} gid: {gid}') os.chown(pkp, uid, gid) log.info(f'chmod {pkp} to 0o600') pkp.chmod(0o600) log.info(f'cmd to encrypt private key: "{" ".join(cmd)}"') p = pexpect.spawnu(' '.join(cmd)) p.logfile = sys.stderr res = p.expect(['Enter Export Password:', pexpect.EOF, pexpect.TIMEOUT]) if res > 0: sys.exit('Failed to run openssl to generate ' f'pkcs12 keys for {service}: {p.before}') p.sendline(export_pw) log.info('send password to encrypt private key') res = p.expect(['Verifying - Enter Export Password:', pexpect.EOF, pexpect.TIMEOUT]) if res > 0: sys.exit('Failed to run openssl to generate ' f'pkcs12 keys for {service}: {p.before}') p.sendline(export_pw) log.info(f'send password to encrypt private key...again') res = p.expect([pexpect.EOF, pexpect.TIMEOUT]) if res > 0: sys.exit(f'Failed to run openssl to generate ' 'pkcs12 keys for {service}: {p.before}') log.info(f'this did not explicitly fail') def get_ssh_pw(): log.info('Get SSH decryption pw') cmd = ['/usr/bin/su', '-l', server_user, '/usr/bin/pass', 'show', 'ssh/autofirewall'] log.info(f'cmd: "{" ".join(cmd)}"') p = subprocess.run(cmd, capture_output=True) decrypt_pp = p.stdout.decode('UTF-8').strip() if not decrypt_pp: sys.exit('Could not get decryption passpharase') log.info('Got SSH decryption pw') return decrypt_pp def reset_challenge_path(): challenge_path = pathlib.Path( '/usr/share/nginx/html/.well-known/acme-challenge/') if challenge_path.is_dir(): recurse_rmdir(challenge_path) log.info('Challenge path deleted') challenge_path.mkdir() log.info('Challenge path created') challenge_path.chmod(0o755) log.info('Challenge path chmodded') return challenge_path def run_cert_bot(fqdn, service, challenge_path, decrypt_pp): cmd = ['/usr/bin/certbot', 'certonly', '--manual', '-d', fqdn] log.info(f'certbot cmd: "{" ".join(cmd)}"') cb = pexpect.spawnu(' '.join(cmd)) cb.logfile = sys.stderr do_update = True matches = [ 'Create a file containing just this data:\r\n\r\n([^\r]+)\r', # 0 ('You have an existing certificate that has exactly the ' # 1 "same domains or certificate name you requested and isn't " 'close to expiry'), '(U)pdate key type/(K)eep existing key type:', # 2 'no action taken', # 3 'No such authorization', # 4 pexpect.TIMEOUT, pexpect.EOF] while True: res = cb.expect(matches, timeout=20) print(f'\nresult: {matches[res]}, {res}') if res > 4: sys.exit('Timed out') if res == 3 or res == 4: do_update = False break if res == 2: cb.sendline('U') continue if res == 1: log.info('Current cert is not yet expired') res = cb.expect_exact(['cancel):', pexpect.TIMEOUT, pexpect.EOF]) if res > 0: sys.exit('Timed out in setup with existing cert') cb.sendline('1') continue # res = cb.expect( # ['Create a file containing just this data:\r\n\r\n([^\r]+)\r', # pexpect.TIMEOUT, pexpect.EOF], timeout=20) # if res > 1: # sys.exit('Timed out') if res == 0: break if do_update: data = cb.match.group(1) log.info(f'secret data: {data}') log.info('the data string and location for the shared secret are known') long_match = ('And make it available on your web server at this URL:' '\r\n\r\nhttp://%s/.well-known/acme-challenge/([^\r]+)\r') res = cb.expect([long_match % (fqdn,), pexpect.TIMEOUT, pexpect.EOF]) if res > 0: sys.exit('Timed out') filename = cb.match.group(1) log.info(f'filename of secret: {filename}') res = cb.expect(['Press Enter to Continue', pexpect.EOF], timeout=0) data_file = challenge_path / pathlib.Path(filename) try: with open(data_file, 'w') as f: f.write(data) except: sys.exit(f'Failed to write {data_file}') log.info('created secret file with secret data') data_file.chmod(0o644) log.info('and chmodded') symlink_name = pathlib.Path(f'{service}-le') log.info(f'nginx symlink: {symlink_name}') nx_conf = pathlib.Path('/etc/nginx') avail_path = nx_conf / pathlib.Path('sites-available') enabled_path = nx_conf / pathlib.Path('sites-enabled') service_available_file = avail_path / symlink_name log.info(f'nginx service avail symlink: {service_available_file}') service_enabled_symlink = enabled_path / symlink_name log.info(f'nginx service enabled symlink: {service_enabled_symlink}') if not service_enabled_symlink.is_symlink(): service_enabled_symlink.symlink_to(service_available_file) log.info('created symlink to enable service') log.info(f'open port 80 to {service}') firewall_mod('HTTP_UP', service, decrypt_pp) restart('nginx') cb.sendline() log.info(f'sent to certbot to continue process') res = cb.expect([pexpect.EOF]) log.info(f'cerbot completed. final output: {cb.before}') output_text = str(cb.before) if 'failed' in output_text: sys.exit('Something went wrong') log.info(f'close port 80 to {service}') firewall_mod('HTTP_DOWN', service, decrypt_pp) service_enabled_symlink.unlink() if service_enabled_symlink.is_symlink(): sys.exit(f'Could not unlink {service_enabled_symlink}') log.info('created symlink to enable service') log.info('removed symlink in nginx to disable HTTP') restart('nginx') key_path = pathlib.Path('/etc/letsencrypt') live = key_path / pathlib.Path(f'live/{service}.{domain}') archive = key_path / pathlib.Path(f'archive/{service}.{domain}') log.info(f'live keypath: {live}') log.info(f'archive keypath: {archive}') if service in globals(): log.info(f'{service} has a service specific function to run') eval(f'{service}()') log.info(f'{service} specific work complete.') user = service if service in users: user = users[service] log.info(f'service {service} has user {user}') uid = pwd.getpwnam(user).pw_uid gid = pwd.getpwnam(user).pw_gid # chown after custom service in case pfx or other key is generated log.info(f'uid: {uid} gid: {gid}') os.chown(live, uid, gid) log.info(f'live keypath chmodded') os.chown(archive, uid, gid) log.info(f'archive keypath chmodded') for dirpath, _, filenames in os.walk(archive): os.chown(dirpath, uid, gid) for filename in filenames: os.chown(os.path.join(dirpath, filename), uid, gid) log.info(f'chmodded new keys from certbot') def main(args): # logging.basicConfig(level=os.environ.get("LOGLEVEL", "WARNING")) logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) log.info(f'program start: {sys.argv}') if len(args) != 1: sys.exit(f'Give a service to renew: {", ".join(supported_services)} ') service = args[0] if service not in supported_services: sys.exit(f'Give a service to renew: {", ".join(supported_services)} ') uid = os.getuid() if uid != 0: sys.exit('Run as root') decrypt_pp = get_ssh_pw() challenge_path = reset_challenge_path() fqdn = f'{service}.{domain}' log.info(f'fqdn: {fqdn}') run_cert_bot(fqdn, service, challenge_path, decrypt_pp) restart(service) log = logging.getLogger(__name__) if __name__ == '__main__': main(sys.argv[1:])