ssl-update/ssl-update.py
2024-03-22 21:18:00 -04:00

453 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""Automation for cert renewal.
assumptions:
* firewall has access configured for specified key
* firewall sshd config contains: `AcceptEnv ssl_service state`
* firewall has `ssl-update.sh` copied to /usr/local/bin and chmod +x
"""
# TODO: cover this as well:
# irc sasl cert will expire in 50 years
# regenerate cert:
# openssl req -x509 -new -newkey ed25519 -sha256 -days 18265 -nodes -out user.pem -keyout user.pem
# get fingerprint (librechat at this time OK with sha512 fingerprint)
# openssl x509 -in user.pem -noout -fingerprint -sha512 | awk -F= '{gsub(":",""); print tolower ($2)}'
# get fingerprint (OFTC at this time _wont_ take sha512, only sha1)
# openssl x509 -in user.pem -noout -fingerprint -sha1 | awk -F= '{gsub(":",""); print tolower ($2)}'
# /msg nickserv cert add $fingerprint
# can't certfp auth external on hackint. tried ed25519/rsa 4096/rsa 2048 with both sha512 and sha1 fingerprints, no luck..
import datetime
import logging
import getpass
import os
import pathlib
import pexpect
import pwd
import subprocess
import sys
import time
supported_services = [
'chat',
'git',
'irc',
'jellyfin',
'nextcloud',
'photoprism',
'plex',
'read',
'sync',
'www',
]
restart_delay = {
'plex': 10
}
pfx_key_path = {
'plex': '/data/plex/certs/certificate.pfx',
'jellyfin': '/etc/letsencrypt/live/jellyfin.drheck.dev/jellyfin.pfx',
}
# Cert owning user if different than the name of the service
# set to disable to not chown at all
users = {
'git': 'gitea',
'read': 'http',
'chat': 'synapse',
'sync': 'syncv3',
'irc': 'disable',
}
# systemd service names that don't match the service name
# service : systemd_service
# user service "disable" to not attempt to restart any service
systemd_services = {
'git': 'gitea',
'plex': 'plexmediaserver',
'read': 'kavita',
'nextcloud': 'php-fpm',
'chat': 'synapse',
'sync': 'sliding-sync',
'irc': 'disable',
}
remote = {
'rng': {
'user': 'luke',
'auth': 'id_m',
},
}
router = 'danknasty'
router_user = 'luke131'
router_key = '/root/.ssh/id_autofirewall'
server_user = 'luke'
def firewall_mod(state, service, decrypt_pp):
os.environ['state'] = 'HTTP_UP'
os.environ['ssl_service'] = service
cmd = ['/usr/bin/ssh', '-i', router_key, '-o',
'SendEnv=state', '-o', 'SendEnv=ssl_service', '-l', router_user,
router, 'doas', '-n', '/usr/local/bin/ssl-update.sh']
log.info(f'env for fw: state: {os.environ["state"]}')
log.info(f'env for fw: ssl_service: {service}')
log.info(f'cmd to connect to firewall: "{" ".join(cmd)}"')
p = pexpect.spawnu(' '.join(cmd))
p.logfile = sys.stderr
log.info(f'key string: {router_key}')
res = p.expect([f'''Enter passphrase for key ['"]{router_key}['"]:''',
pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit('Couldnt send decryption key to ssh.')
p.sendline(decrypt_pp)
res = p.expect(['success', pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit(f'Failed. error: {p.before}')
state_print = state.split('_')[1].lower()
log.info(f'Turned {state_print} HTTP for {service}')
def recurse_rmdir(directory):
directory = pathlib.Path(directory)
for item in directory.iterdir():
if item.is_dir():
recurse_rmdir(item)
else:
item.unlink()
directory.rmdir()
def jellyfin():
log.info('custom function for jellyfin actions')
pfx_gen('jellyfin')
def plex():
log.info('custom function for plex actions')
pfx_gen('plex')
def restart(service):
if service in restart_delay:
wait = restart_delay[service]
else:
wait = 5
try:
systemd_service = systemd_services[service]
except KeyError:
systemd_service = service
if systemd_service == 'disable':
log.info(f'will perform no service action for {service}')
return
log.info(f'going to restart service: {service}')
cmd = ['/usr/bin/systemctl', 'restart', systemd_service]
log.info(f'cmd to restart service: "{" ".join(cmd)}"')
p = subprocess.run(cmd, capture_output=True)
log.info(f'sleeping {wait} seconds')
time.sleep(wait)
cmd = ['/usr/bin/systemctl', 'is-active', '--quiet', systemd_service]
log.info(f'cmd to check service status: "{" ".join(cmd)}"')
p = subprocess.run(cmd)
if p.returncode != 0:
log.error(f'{service} failed to restart:')
cmd = ['/usr/bin/systemctl', 'status', systemd_service]
log.info(f'cmd to show status of service: "{" ".join(cmd)}"')
p = subprocess.run(cmd, capture_output=True)
stderr = p.stderr.decode('UTF-8')
sys.stderr.write(stderr)
sys.exit(1)
log.info(f'{service} has restarted OK')
def pfx_gen(service):
log.info(f'generate pfx file for {service}')
cmd = ['/usr/bin/su', '-l', server_user,
'/usr/bin/pass', 'show', f'ssl/{service}']
log.info(f'cmd to get password to encrypt private key: "{" ".join(cmd)}"')
p = subprocess.run(cmd, capture_output=True)
export_pw = p.stdout.decode('UTF-8').strip()
if not export_pw:
sys.exit(f'Couldnt get ssl export password for {service}')
try:
pkp = pfx_key_path[service]
except KeyError:
sys.exit(f'{service} has no defined private key path.')
cmd = ['/usr/bin/openssl', 'pkcs12', '-export', '-out', pkp,
'-inkey', f'/etc/letsencrypt/live/{service}.drheck.dev/privkey.pem',
'-in', f'/etc/letsencrypt/live/{service}.drheck.dev/cert.pem',
'-certfile', f'/etc/letsencrypt/live/{service}.drheck.dev/chain.pem']
log.info(f'cmd to encrypt private key: "{" ".join(cmd)}"')
p = pexpect.spawnu(' '.join(cmd))
p.logfile = sys.stderr
res = p.expect(['Enter Export Password:', pexpect.EOF, pexpect.TIMEOUT])
if res > 0:
sys.exit('Failed to run openssl to generate '
f'pkcs12 keys for {service}: {p.before}')
p.sendline(export_pw)
log.info(f'send password to encrypt private key')
res = p.expect(['Verifying - Enter Export Password:',
pexpect.EOF, pexpect.TIMEOUT])
if res > 0:
sys.exit('Failed to run openssl to generate '
f'pkcs12 keys for {service}: {p.before}')
p.sendline(export_pw)
log.info(f'send password to encrypt private key...again')
res = p.expect([pexpect.EOF, pexpect.TIMEOUT])
if res > 0:
sys.exit(f'Failed to run openssl to generate '
'pkcs12 keys for {service}: {p.before}')
log.info(f'this did not explicitly fail')
def get_decrypt_pp(server_user):
log.info('Get SSH decryption pw')
cmd = ['/usr/bin/su', '-l', server_user,
'/usr/bin/pass', 'show', 'ssh/autofirewall']
log.info(f'cmd: "{" ".join(cmd)}"')
p = subprocess.run(cmd, capture_output=True)
decrypt_pp = p.stdout.decode('UTF-8').strip()
if not decrypt_pp:
sys.exit('Could not get decryption passpharase')
log.info('Got SSH decryption pw')
def root_check():
uid = os.getuid()
if uid != 0:
sys.exit('Run as root')
def remake_challenge_path(challenge_path):
if challenge_path.is_dir():
recurse_rmdir(challenge_path)
log.info('Challenge path deleted')
challenge_path.mkdir()
log.info('Challenge path created')
challenge_path.chmod(0o755)
log.info('Challenge path chmodded')
# irc:
# ssh rng
# check if nginx enabled
# yes:
# ln /etc/nginx/sites-available/default /etc/nginx/sites-enabled
# systemctl restart nginx
# no:
# ln /etc/nginx/sites-available/default /etc/nginx/sites-enabled
# systemctl enable --now nginx
# sudo /usr/bin/certbot renew
# rm /etc/nginx/sites-enabled/default
# systemctl disable --now nginx
# or
# systemctl restart nginx
# sudo cp /etc/letsencrypt/live/irc.rng.drheck.dev/fullchain.pem /home/luke/.znc/znc.pem
# cat /etc/letsencrypt/live/irc.rng.drheck.dev/{privkey,fullchain}.pem | sudo tee /home/luke/.znc/znc.pem
# sudo chown luke:luke /home/luke/.znc/znc.pem
class Certbot:
def __init__(self):
self.fqdn = ''
self.service = ''
self.skip = False
self.cb = None
self.data = None
self.next_step = None
def set_service(self, service):
self.service = service
self.fqdn = f'{service}.drheck.dev'
def certbot_spawn(self):
if not self.service:
sys.exit('Set service first')
cmd = ['/usr/bin/certbot', 'certonly', '--manual', '-d', self.fqdn]
log.info(f'certbot cmd: "{" ".join(cmd)}"')
self.cb = pexpect.spawnu(' '.join(cmd))
self.cb.logfile = sys.stderr
def certbot_init(self):
expected_results = {
0: 'Create a file containing just this data:\r\n\r\n([^\r]+)\r',
1: ('You have an existing certificate that has exactly the '
"same domains or certificate name you requested and isn't "
'close to expiry'),
2: '\(U\)pdate key type\/\(K\)eep existing key type:',
3: pexpect.TIMEOUT,
4: pexpect.EOF,
}
res_list = []
dict_len = len(expected_results.keys())
for i in range(dict_len):
res_list.append(expected_results[i])
res = self.cb.expect(res_list, timeout=20)
if res > 2:
log.error(p.before)
sys.exit('Timed out. did not see any expected output')
if res == 2:
self.cb.sendline('U')
continue
if res == 0:
self.data = self.cb.match.group(1)
self.next_step = 'update cert'
break
if res == 1:
log.info('Current cert is not yet expired')
res = self.cb.expect_exact(['cancel):', pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit('Timed out in setup with existing cert')
self.cb.sendline('1')
self.next_step = 'post cert'
break
def main(args):
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
root_check()
log.info(f'program start: {sys.argv}')
if len(args) == 1:
sys.exit('Give a service(s) to renew: ',
', '.join(supported_services))
service = args[0]
if service not in supported_services:
sys.exit(f'Give a service to renew: {", ".join(supported_services)} ')
# pass phrase for ssh key decryption
decrypt_pp = get_decrypt_pp()
challenge_path = pathlib.Path(
'/usr/share/nginx/html/.well-known/acme-challenge/')
# delete any crud from here and make sure correct permissions
remake_challenge_path(challenge_path)
log.info(f'fqdn: {fqdn}')
# Dry run:
# cmd = ['/usr/bin/certbot', '--dry-run', 'certonly', '--manual', '-d', fqdn]
# TODO: finish this function v
certbot = Certbot(fqdn)
certbot.set_service(service)
certbot.certbot_spawn()
while True:
certbot.certbot_init()
if certbot.next_step:
break
if certbot.next_step == 'update cert':
certbot.write_secret()
nginx.enable_secret_http()
firewall.open(service)
certbot.secret_ready()
update_firewall
certbot.create_secret()
def create_secret(self):
log.info(f'secret data: {data}')
log.info('the data string and location for the shared secret are known')
long_match = ('And make it available on your web server at this URL:'
'\r\n\r\nhttp://%s/.well-known/acme-challenge/([^\r]+)\r')
res = cb.expect([long_match % (fqdn,), pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit('Timed out')
filename = cb.match.group(1)
log.info(f'filename of secret: {filename}')
res = cb.expect(['Press Enter to Continue', pexpect.EOF], timeout=0)
data_file = challenge_path / pathlib.Path(filename)
try:
with open(data_file, 'w') as f:
f.write(data)
except:
sys.exit(f'Failed to write {data_file}')
log.info('created secret file with secret data')
data_file.chmod(0o644)
log.info('and chmodded')
symlink_name = pathlib.Path(f'{service}-le')
log.info(f'nginx symlink: {symlink_name}')
nx_conf = pathlib.Path('/etc/nginx')
avail_path = nx_conf / pathlib.Path('sites-available')
enabled_path = nx_conf / pathlib.Path('sites-enabled')
service_available_file = avail_path / symlink_name
log.info(f'nginx service avail symlink: {service_available_file}')
service_enabled_symlink = enabled_path / symlink_name
log.info(f'nginx service enabled symlink: {service_enabled_symlink}')
if not service_enabled_symlink.is_symlink():
service_enabled_symlink.symlink_to(service_available_file)
log.info('created symlink to enable service')
log.info(f'open port 80 to {service}')
firewall_mod('HTTP_UP', service, decrypt_pp)
restart('nginx')
cb.sendline()
log.info(f'sent <enter> to certbot to continue process')
res = cb.expect([pexpect.EOF])
log.info(f'cerbot completed. final output: {cb.before}')
if 'failed' in cb.before:
sys.exit('Something went wrong')
log.info(f'open port 80 to {service}')
firewall_mod('HTTP_DOWN', service, decrypt_pp)
service_enabled_symlink.unlink()
log.info('removed symlink in nginx to disable HTTP')
restart('nginx')
key_path = pathlib.Path('/etc/letsencrypt')
live = key_path / pathlib.Path(f'live/{service}.drheck.dev')
archive = key_path / pathlib.Path(f'archive/{service}.drheck.dev')
log.info(f'live keypath: {live}')
log.info(f'archive keypath: {archive}')
if service in globals():
log.info(f'{service} has a service specific function to run')
eval(f'{service}()')
log.info(f'{service} specific work complete.')
user = service
if service in users:
user = users[service]
log.info(f'service {service} has user {user}')
uid = pwd.getpwnam(user).pw_uid
gid = pwd.getpwnam(user).pw_gid
# chown after custom service in case pfx or other key is generated
log.info(f'uid: {uid} gid: {gid}')
os.chown(live, uid, gid)
log.info(f'live keypath chmodded')
os.chown(archive, uid, gid)
log.info(f'archive keypath chmodded')
for dirpath, dirnames, filenames in os.walk(archive):
os.chown(dirpath, uid, gid)
for filename in filenames:
os.chown(os.path.join(dirpath, filename), uid, gid)
log.info(f'chmodded new keys from certbot')
restart(service)
log = logging.getLogger(__name__)
if __name__ == '__main__':
main(sys.argv[1:])