#!/usr/bin/env python3 """Automation for cert renewal. assumptions: * firewall has access configured for specified key * firewall sshd config contains: `AcceptEnv ssl_service state` * firewall has `ssl-update.sh` copied to /usr/local/bin and chmod +x """ import datetime import logging import getpass import os import pathlib import pexpect import pwd import subprocess import sys import time supported_services = [ 'chat', 'git', 'irc', 'jellyfin', 'nextcloud', 'photoprism', 'plex', 'read', 'sync', 'www', ] restart_delay = { 'plex': 10 } pfx_key_path = { 'plex': '/data/plex/certs/certificate.pfx', 'jellyfin': '/etc/letsencrypt/live/jellyfin.drheck.dev/jellyfin.pfx', } # Cert owning user if different than the name of the service # set to disable to not chown at all users = { 'git': 'gitea', 'read': 'http', 'chat': 'synapse', 'sync': 'syncv3', 'irc': 'disable', } # systemd service names that don't match the service name # service : systemd_service # user service "disable" to not attempt to restart any service systemd_services = { 'git': 'gitea', 'plex': 'plexmediaserver', 'read': 'kavita', 'nextcloud': 'php-fpm', 'chat': 'synapse', 'sync': 'sliding-sync', 'irc': 'disable', } remote = { 'rng': { 'user': 'luke', 'auth': 'id_m', }, } router = 'danknasty' router_user = 'luke131' router_key = '/root/.ssh/id_autofirewall' server_user = 'luke' def firewall_mod(state, service, decrypt_pp): os.environ['state'] = 'HTTP_UP' os.environ['ssl_service'] = service cmd = ['/usr/bin/ssh', '-i', router_key, '-o', 'SendEnv=state', '-o', 'SendEnv=ssl_service', '-l', router_user, router, 'doas', '-n', '/usr/local/bin/ssl-update.sh'] log.info(f'env for fw: state: {os.environ["state"]}') log.info(f'env for fw: ssl_service: {service}') log.info(f'cmd to connect to firewall: "{" ".join(cmd)}"') p = pexpect.spawnu(' '.join(cmd)) p.logfile = sys.stderr log.info(f'key string: {router_key}') res = p.expect([f'''Enter passphrase for key ['"]{router_key}['"]:''', pexpect.TIMEOUT, pexpect.EOF]) if res > 0: sys.exit('Couldnt send decryption key to ssh.') p.sendline(decrypt_pp) res = p.expect(['success', pexpect.TIMEOUT, pexpect.EOF]) if res > 0: sys.exit(f'Failed. error: {p.before}') state_print = state.split('_')[1].lower() log.info(f'Turned {state_print} HTTP for {service}') def recurse_rmdir(directory): directory = pathlib.Path(directory) for item in directory.iterdir(): if item.is_dir(): recurse_rmdir(item) else: item.unlink() directory.rmdir() def jellyfin(): log.info('custom function for jellyfin actions') pfx_gen('jellyfin') def plex(): log.info('custom function for plex actions') pfx_gen('plex') def restart(service): if service in restart_delay: wait = restart_delay[service] else: wait = 5 try: systemd_service = systemd_services[service] except KeyError: systemd_service = service if systemd_service == 'disable': log.info(f'will perform no service action for {service}') return log.info(f'going to restart service: {service}') cmd = ['/usr/bin/systemctl', 'restart', systemd_service] log.info(f'cmd to restart service: "{" ".join(cmd)}"') p = subprocess.run(cmd, capture_output=True) log.info(f'sleeping {wait} seconds') time.sleep(wait) cmd = ['/usr/bin/systemctl', 'is-active', '--quiet', systemd_service] log.info(f'cmd to check service status: "{" ".join(cmd)}"') p = subprocess.run(cmd) if p.returncode != 0: log.error(f'{service} failed to restart:') cmd = ['/usr/bin/systemctl', 'status', systemd_service] log.info(f'cmd to show status of service: "{" ".join(cmd)}"') p = subprocess.run(cmd, capture_output=True) stderr = p.stderr.decode('UTF-8') sys.stderr.write(stderr) sys.exit(1) log.info(f'{service} has restarted OK') def pfx_gen(service): log.info(f'generate pfx file for {service}') cmd = ['/usr/bin/su', '-l', server_user, '/usr/bin/pass', 'show', f'ssl/{service}'] log.info(f'cmd to get password to encrypt private key: "{" ".join(cmd)}"') p = subprocess.run(cmd, capture_output=True) export_pw = p.stdout.decode('UTF-8').strip() if not export_pw: sys.exit(f'Couldnt get ssl export password for {service}') try: pkp = pfx_key_path[service] except KeyError: sys.exit(f'{service} has no defined private key path.') cmd = ['/usr/bin/openssl', 'pkcs12', '-export', '-out', pkp, '-inkey', f'/etc/letsencrypt/live/{service}.drheck.dev/privkey.pem', '-in', f'/etc/letsencrypt/live/{service}.drheck.dev/cert.pem', '-certfile', f'/etc/letsencrypt/live/{service}.drheck.dev/chain.pem'] log.info(f'cmd to encrypt private key: "{" ".join(cmd)}"') p = pexpect.spawnu(' '.join(cmd)) p.logfile = sys.stderr res = p.expect(['Enter Export Password:', pexpect.EOF, pexpect.TIMEOUT]) if res > 0: sys.exit('Failed to run openssl to generate ' f'pkcs12 keys for {service}: {p.before}') p.sendline(export_pw) log.info(f'send password to encrypt private key') res = p.expect(['Verifying - Enter Export Password:', pexpect.EOF, pexpect.TIMEOUT]) if res > 0: sys.exit('Failed to run openssl to generate ' f'pkcs12 keys for {service}: {p.before}') p.sendline(export_pw) log.info(f'send password to encrypt private key...again') res = p.expect([pexpect.EOF, pexpect.TIMEOUT]) if res > 0: sys.exit(f'Failed to run openssl to generate ' 'pkcs12 keys for {service}: {p.before}') log.info(f'this did not explicitly fail') def get_decrypt_pp(server_user): log.info('Get SSH decryption pw') cmd = ['/usr/bin/su', '-l', server_user, '/usr/bin/pass', 'show', 'ssh/autofirewall'] log.info(f'cmd: "{" ".join(cmd)}"') p = subprocess.run(cmd, capture_output=True) decrypt_pp = p.stdout.decode('UTF-8').strip() if not decrypt_pp: sys.exit('Could not get decryption passpharase') log.info('Got SSH decryption pw') def root_check(): uid = os.getuid() if uid != 0: sys.exit('Run as root') def remake_challenge_path(challenge_path): if challenge_path.is_dir(): recurse_rmdir(challenge_path) log.info('Challenge path deleted') challenge_path.mkdir() log.info('Challenge path created') challenge_path.chmod(0o755) log.info('Challenge path chmodded') # irc: # ssh rng # check if nginx enabled # yes: # ln /etc/nginx/sites-available/default /etc/nginx/sites-enabled # systemctl restart nginx # no: # ln /etc/nginx/sites-available/default /etc/nginx/sites-enabled # systemctl enable --now nginx # sudo /usr/bin/certbot renew # rm /etc/nginx/sites-enabled/default # systemctl disable --now nginx # or # systemctl restart nginx # sudo cp /etc/letsencrypt/live/irc.rng.drheck.dev/fullchain.pem /home/luke/.znc/znc.pem # sudo chown luke:luke /home/luke/.znc/znc.pem class Certbot: def __init__(self): self.fqdn = '' self.service = '' self.skip = False self.cb = None self.data = None self.next_step = None def set_service(self, service): self.service = service self.fqdn = f'{service}.drheck.dev' def certbot_spawn(self): if not self.service: sys.exit('Set service first') cmd = ['/usr/bin/certbot', 'certonly', '--manual', '-d', self.fqdn] log.info(f'certbot cmd: "{" ".join(cmd)}"') self.cb = pexpect.spawnu(' '.join(cmd)) self.cb.logfile = sys.stderr def certbot_init(self): expected_results = { 0: 'Create a file containing just this data:\r\n\r\n([^\r]+)\r', 1: ('You have an existing certificate that has exactly the ' "same domains or certificate name you requested and isn't " 'close to expiry'), 2: '\(U\)pdate key type\/\(K\)eep existing key type:', 3: pexpect.TIMEOUT, 4: pexpect.EOF, } res_list = [] dict_len = len(expected_results.keys()) for i in range(dict_len): res_list.append(expected_results[i]) res = self.cb.expect(res_list, timeout=20) if res > 2: log.error(p.before) sys.exit('Timed out. did not see any expected output') if res == 2: self.cb.sendline('U') continue if res == 0: self.data = self.cb.match.group(1) self.next_step = 'update cert' break if res == 1: log.info('Current cert is not yet expired') res = self.cb.expect_exact(['cancel):', pexpect.TIMEOUT, pexpect.EOF]) if res > 0: sys.exit('Timed out in setup with existing cert') self.cb.sendline('1') self.next_step = 'post cert' break def main(args): logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) root_check() log.info(f'program start: {sys.argv}') if len(args) == 1: sys.exit('Give a service(s) to renew: ', ', '.join(supported_services)) service = args[0] if service not in supported_services: sys.exit(f'Give a service to renew: {", ".join(supported_services)} ') # pass phrase for ssh key decryption decrypt_pp = get_decrypt_pp() challenge_path = pathlib.Path( '/usr/share/nginx/html/.well-known/acme-challenge/') # delete any crud from here and make sure correct permissions remake_challenge_path(challenge_path) log.info(f'fqdn: {fqdn}') # Dry run: # cmd = ['/usr/bin/certbot', '--dry-run', 'certonly', '--manual', '-d', fqdn] # TODO: finish this function v certbot = Certbot(fqdn) certbot.set_service(service) certbot.certbot_spawn() while True: certbot.certbot_init() if certbot.next_step: break if certbot.next_step == 'update cert': certbot.write_secret() nginx.enable_secret_http() firewall.open(service) certbot.secret_ready() update_firewall certbot.create_secret() def create_secret(self): log.info(f'secret data: {data}') log.info('the data string and location for the shared secret are known') long_match = ('And make it available on your web server at this URL:' '\r\n\r\nhttp://%s/.well-known/acme-challenge/([^\r]+)\r') res = cb.expect([long_match % (fqdn,), pexpect.TIMEOUT, pexpect.EOF]) if res > 0: sys.exit('Timed out') filename = cb.match.group(1) log.info(f'filename of secret: {filename}') res = cb.expect(['Press Enter to Continue', pexpect.EOF], timeout=0) data_file = challenge_path / pathlib.Path(filename) try: with open(data_file, 'w') as f: f.write(data) except: sys.exit(f'Failed to write {data_file}') log.info('created secret file with secret data') data_file.chmod(0o644) log.info('and chmodded') symlink_name = pathlib.Path(f'{service}-le') log.info(f'nginx symlink: {symlink_name}') nx_conf = pathlib.Path('/etc/nginx') avail_path = nx_conf / pathlib.Path('sites-available') enabled_path = nx_conf / pathlib.Path('sites-enabled') service_available_file = avail_path / symlink_name log.info(f'nginx service avail symlink: {service_available_file}') service_enabled_symlink = enabled_path / symlink_name log.info(f'nginx service enabled symlink: {service_enabled_symlink}') if not service_enabled_symlink.is_symlink(): service_enabled_symlink.symlink_to(service_available_file) log.info('created symlink to enable service') log.info(f'open port 80 to {service}') firewall_mod('HTTP_UP', service, decrypt_pp) restart('nginx') cb.sendline() log.info(f'sent to certbot to continue process') res = cb.expect([pexpect.EOF]) log.info(f'cerbot completed. final output: {cb.before}') if 'failed' in cb.before: sys.exit('Something went wrong') log.info(f'open port 80 to {service}') firewall_mod('HTTP_DOWN', service, decrypt_pp) service_enabled_symlink.unlink() log.info('removed symlink in nginx to disable HTTP') restart('nginx') key_path = pathlib.Path('/etc/letsencrypt') live = key_path / pathlib.Path(f'live/{service}.drheck.dev') archive = key_path / pathlib.Path(f'archive/{service}.drheck.dev') log.info(f'live keypath: {live}') log.info(f'archive keypath: {archive}') if service in globals(): log.info(f'{service} has a service specific function to run') eval(f'{service}()') log.info(f'{service} specific work complete.') user = service if service in users: user = users[service] log.info(f'service {service} has user {user}') uid = pwd.getpwnam(user).pw_uid gid = pwd.getpwnam(user).pw_gid # chown after custom service in case pfx or other key is generated log.info(f'uid: {uid} gid: {gid}') os.chown(live, uid, gid) log.info(f'live keypath chmodded') os.chown(archive, uid, gid) log.info(f'archive keypath chmodded') for dirpath, dirnames, filenames in os.walk(archive): os.chown(dirpath, uid, gid) for filename in filenames: os.chown(os.path.join(dirpath, filename), uid, gid) log.info(f'chmodded new keys from certbot') restart(service) log = logging.getLogger(__name__) if __name__ == '__main__': main(sys.argv[1:])