refactor main, new script to update all services

- continue to cut main into functions
- started pylint
- script to update all and log
This commit is contained in:
LuKe Tidd 2024-04-27 11:26:19 -04:00
parent e7a4ac73ab
commit b9b557aae3
Signed by: luke
GPG Key ID: 75D6600BEF4E8E8F
2 changed files with 158 additions and 218 deletions

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
# pylint: disable=W1309 W1203 W1510 C0116 W1401 C0103 R1715 R0914
"""Automation for cert renewal.
assumptions:
@ -7,40 +8,23 @@ assumptions:
* firewall has `ssl-update.sh` copied to /usr/local/bin and chmod +x
"""
# TODO: cover this as well:
# irc sasl cert will expire in 50 years
# regenerate cert:
# openssl req -x509 -new -newkey ed25519 -sha256 -days 18265 -nodes -out user.pem -keyout user.pem
# get fingerprint (librechat at this time OK with sha512 fingerprint)
# openssl x509 -in user.pem -noout -fingerprint -sha512 | awk -F= '{gsub(":",""); print tolower ($2)}'
# get fingerprint (OFTC at this time _wont_ take sha512, only sha1)
# openssl x509 -in user.pem -noout -fingerprint -sha1 | awk -F= '{gsub(":",""); print tolower ($2)}'
# /msg nickserv cert add $fingerprint
# can't certfp auth external on hackint. tried ed25519/rsa 4096/rsa 2048 with both sha512 and sha1 fingerprints, no luck..
import datetime
import logging
import getpass
import os
import pathlib
import pexpect
import pwd
import subprocess
import sys
import time
import pexpect
firewall = 'danknasty'
firewall_user = 'luke131'
firewall_key = '/root/.ssh/id_autofirewall'
server_user = 'luke'
domain = 'drheck.dev'
supported_services = [
'chat',
'git',
'irc',
'jellyfin',
'nextcloud',
'photoprism',
'plex',
'read',
'sync',
'www',
]
'git', 'plex', 'jellyfin', 'photoprism', 'nextcloud', 'read', 'www', 'chat', 'sync']
restart_delay = {
'plex': 10
@ -48,22 +32,20 @@ restart_delay = {
pfx_key_path = {
'plex': '/data/plex/certs/certificate.pfx',
'jellyfin': '/etc/letsencrypt/live/jellyfin.drheck.dev/jellyfin.pfx',
'jellyfin': f'/etc/letsencrypt/live/jellyfin.{domain}/jellyfin.pfx',
}
# Cert owning user if different than the name of the service
# set to disable to not chown at all
users = {
'git': 'gitea',
'read': 'http',
'chat': 'synapse',
'sync': 'syncv3',
'irc': 'disable',
'www': 'http',
}
# systemd service names that don't match the service name
# service : systemd_service
# user service "disable" to not attempt to restart any service
systemd_services = {
'git': 'gitea',
'plex': 'plexmediaserver',
@ -71,7 +53,7 @@ systemd_services = {
'nextcloud': 'php-fpm',
'chat': 'synapse',
'sync': 'sliding-sync',
'irc': 'disable',
'www': 'nginx',
}
@ -82,25 +64,19 @@ remote = {
},
}
router = 'danknasty'
router_user = 'luke131'
router_key = '/root/.ssh/id_autofirewall'
server_user = 'luke'
def firewall_mod(state, service, decrypt_pp):
os.environ['state'] = 'HTTP_UP'
os.environ['ssl_service'] = service
cmd = ['/usr/bin/ssh', '-i', router_key, '-o',
'SendEnv=state', '-o', 'SendEnv=ssl_service', '-l', router_user,
router, 'doas', '-n', '/usr/local/bin/ssl-update.sh']
cmd = ['/usr/bin/ssh', '-i', firewall_key, '-o',
'SendEnv=state', '-o', 'SendEnv=ssl_service', '-l', firewall_user,
firewall, 'doas', '-n', '/usr/local/bin/ssl-update.sh']
log.info(f'env for fw: state: {os.environ["state"]}')
log.info(f'env for fw: ssl_service: {service}')
log.info(f'cmd to connect to firewall: "{" ".join(cmd)}"')
p = pexpect.spawnu(' '.join(cmd))
p.logfile = sys.stderr
log.info(f'key string: {router_key}')
res = p.expect([f'''Enter passphrase for key ['"]{router_key}['"]:''',
log.info(f'key string: {firewall_key}')
res = p.expect([f'''Enter passphrase for key ['"]{firewall_key}['"]:''',
pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit('Couldnt send decryption key to ssh.')
@ -137,14 +113,10 @@ def restart(service):
wait = restart_delay[service]
else:
wait = 5
try:
systemd_service = systemd_services[service]
except KeyError:
systemd_service = service
if systemd_service == 'disable':
log.info(f'will perform no service action for {service}')
return
log.info(f'going to restart service: {service}')
cmd = ['/usr/bin/systemctl', 'restart', systemd_service]
@ -182,9 +154,9 @@ def pfx_gen(service):
sys.exit(f'{service} has no defined private key path.')
cmd = ['/usr/bin/openssl', 'pkcs12', '-export', '-out', pkp,
'-inkey', f'/etc/letsencrypt/live/{service}.drheck.dev/privkey.pem',
'-in', f'/etc/letsencrypt/live/{service}.drheck.dev/cert.pem',
'-certfile', f'/etc/letsencrypt/live/{service}.drheck.dev/chain.pem']
'-inkey', f'/etc/letsencrypt/live/{service}.{domain}/privkey.pem',
'-in', f'/etc/letsencrypt/live/{service}.{domain}/cert.pem',
'-certfile', f'/etc/letsencrypt/live/{service}.{domain}/chain.pem']
log.info(f'cmd to encrypt private key: "{" ".join(cmd)}"')
p = pexpect.spawnu(' '.join(cmd))
p.logfile = sys.stderr
@ -193,7 +165,7 @@ def pfx_gen(service):
sys.exit('Failed to run openssl to generate '
f'pkcs12 keys for {service}: {p.before}')
p.sendline(export_pw)
log.info(f'send password to encrypt private key')
log.info('send password to encrypt private key')
res = p.expect(['Verifying - Enter Export Password:',
pexpect.EOF, pexpect.TIMEOUT])
if res > 0:
@ -208,7 +180,7 @@ def pfx_gen(service):
log.info(f'this did not explicitly fail')
def get_decrypt_pp(server_user):
def get_ssh_pw():
log.info('Get SSH decryption pw')
cmd = ['/usr/bin/su', '-l', server_user,
'/usr/bin/pass', 'show', 'ssh/autofirewall']
@ -218,15 +190,12 @@ def get_decrypt_pp(server_user):
if not decrypt_pp:
sys.exit('Could not get decryption passpharase')
log.info('Got SSH decryption pw')
return decrypt_pp
def root_check():
uid = os.getuid()
if uid != 0:
sys.exit('Run as root')
def remake_challenge_path(challenge_path):
def reset_challenge_path():
challenge_path = pathlib.Path(
'/usr/share/nginx/html/.well-known/acme-challenge/')
if challenge_path.is_dir():
recurse_rmdir(challenge_path)
log.info('Challenge path deleted')
@ -234,187 +203,99 @@ def remake_challenge_path(challenge_path):
log.info('Challenge path created')
challenge_path.chmod(0o755)
log.info('Challenge path chmodded')
return challenge_path
# irc:
# ssh rng
# check if nginx enabled
# yes:
# ln /etc/nginx/sites-available/default /etc/nginx/sites-enabled
# systemctl restart nginx
# no:
# ln /etc/nginx/sites-available/default /etc/nginx/sites-enabled
# systemctl enable --now nginx
# sudo /usr/bin/certbot renew
# rm /etc/nginx/sites-enabled/default
# systemctl disable --now nginx
# or
# systemctl restart nginx
# sudo cp /etc/letsencrypt/live/irc.rng.drheck.dev/fullchain.pem /home/luke/.znc/znc.pem
# cat /etc/letsencrypt/live/irc.rng.drheck.dev/{privkey,fullchain}.pem | sudo tee /home/luke/.znc/znc.pem
# sudo chown luke:luke /home/luke/.znc/znc.pem
class Certbot:
def __init__(self):
self.fqdn = ''
self.service = ''
self.skip = False
self.cb = None
self.data = None
self.next_step = None
def set_service(self, service):
self.service = service
self.fqdn = f'{service}.drheck.dev'
def certbot_spawn(self):
if not self.service:
sys.exit('Set service first')
cmd = ['/usr/bin/certbot', 'certonly', '--manual', '-d', self.fqdn]
log.info(f'certbot cmd: "{" ".join(cmd)}"')
self.cb = pexpect.spawnu(' '.join(cmd))
self.cb.logfile = sys.stderr
def certbot_init(self):
expected_results = {
0: 'Create a file containing just this data:\r\n\r\n([^\r]+)\r',
1: ('You have an existing certificate that has exactly the '
"same domains or certificate name you requested and isn't "
'close to expiry'),
2: '\(U\)pdate key type\/\(K\)eep existing key type:',
3: pexpect.TIMEOUT,
4: pexpect.EOF,
}
res_list = []
dict_len = len(expected_results.keys())
for i in range(dict_len):
res_list.append(expected_results[i])
res = self.cb.expect(res_list, timeout=20)
def run_cert_bot(fqdn, service, challenge_path, decrypt_pp):
cmd = ['/usr/bin/certbot', 'certonly', '--manual', '-d', fqdn]
log.info(f'certbot cmd: "{" ".join(cmd)}"')
cb = pexpect.spawnu(' '.join(cmd))
cb.logfile = sys.stderr
while True:
res = cb.expect(
['Create a file containing just this data:\r\n\r\n([^\r]+)\r',
('You have an existing certificate that has exactly the '
"same domains or certificate name you requested and isn't "
'close to expiry'),'\(U\)pdate key type\/\(K\)eep existing key type:',
pexpect.TIMEOUT, pexpect.EOF], timeout=20)
if res > 2:
log.error(p.before)
sys.exit('Timed out. did not see any expected output')
sys.exit('Timed out')
if res == 2:
self.cb.sendline('U')
cb.sendline('U')
continue
if res == 0:
self.data = self.cb.match.group(1)
self.next_step = 'update cert'
break
if res == 1:
log.info('Current cert is not yet expired')
res = self.cb.expect_exact(['cancel):', pexpect.TIMEOUT, pexpect.EOF])
res = cb.expect_exact(['cancel):', pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit('Timed out in setup with existing cert')
self.cb.sendline('1')
self.next_step = 'post cert'
cb.sendline('2')
res = cb.expect(
['Create a file containing just this data:\r\n\r\n([^\r]+)\r',
pexpect.TIMEOUT, pexpect.EOF], timeout=20)
if res > 1:
sys.exit('Timed out')
if res == 0:
break
data = cb.match.group(1)
log.info(f'secret data: {data}')
log.info('the data string and location for the shared secret are known')
long_match = ('And make it available on your web server at this URL:'
'\r\n\r\nhttp://%s/.well-known/acme-challenge/([^\r]+)\r')
res = cb.expect([long_match % (fqdn,), pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit('Timed out')
filename = cb.match.group(1)
log.info(f'filename of secret: {filename}')
res = cb.expect(['Press Enter to Continue', pexpect.EOF], timeout=0)
def main(args):
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
data_file = challenge_path / pathlib.Path(filename)
try:
with open(data_file, 'w') as f:
f.write(data)
except:
sys.exit(f'Failed to write {data_file}')
log.info('created secret file with secret data')
data_file.chmod(0o644)
log.info('and chmodded')
root_check()
log.info(f'program start: {sys.argv}')
symlink_name = pathlib.Path(f'{service}-le')
log.info(f'nginx symlink: {symlink_name}')
nx_conf = pathlib.Path('/etc/nginx')
avail_path = nx_conf / pathlib.Path('sites-available')
enabled_path = nx_conf / pathlib.Path('sites-enabled')
if len(args) == 1:
sys.exit('Give a service(s) to renew: ',
', '.join(supported_services))
service = args[0]
if service not in supported_services:
sys.exit(f'Give a service to renew: {", ".join(supported_services)} ')
service_available_file = avail_path / symlink_name
log.info(f'nginx service avail symlink: {service_available_file}')
service_enabled_symlink = enabled_path / symlink_name
log.info(f'nginx service enabled symlink: {service_enabled_symlink}')
if not service_enabled_symlink.is_symlink():
service_enabled_symlink.symlink_to(service_available_file)
log.info('created symlink to enable service')
# pass phrase for ssh key decryption
decrypt_pp = get_decrypt_pp()
log.info(f'open port 80 to {service}')
firewall_mod('HTTP_UP', service, decrypt_pp)
restart('nginx')
challenge_path = pathlib.Path(
'/usr/share/nginx/html/.well-known/acme-challenge/')
# delete any crud from here and make sure correct permissions
remake_challenge_path(challenge_path)
cb.sendline()
log.info(f'sent <enter> to certbot to continue process')
res = cb.expect([pexpect.EOF])
log.info(f'cerbot completed. final output: {cb.before}')
log.info(f'fqdn: {fqdn}')
# Dry run:
# cmd = ['/usr/bin/certbot', '--dry-run', 'certonly', '--manual', '-d', fqdn]
output_text = str(cb.before)
if 'failed' in output_text:
sys.exit('Something went wrong')
# TODO: finish this function v
log.info(f'open port 80 to {service}')
firewall_mod('HTTP_DOWN', service, decrypt_pp)
certbot = Certbot(fqdn)
certbot.set_service(service)
certbot.certbot_spawn()
while True:
certbot.certbot_init()
if certbot.next_step:
break
if certbot.next_step == 'update cert':
certbot.write_secret()
nginx.enable_secret_http()
firewall.open(service)
certbot.secret_ready()
service_enabled_symlink.unlink()
log.info('removed symlink in nginx to disable HTTP')
update_firewall
certbot.create_secret()
def create_secret(self):
log.info(f'secret data: {data}')
log.info('the data string and location for the shared secret are known')
long_match = ('And make it available on your web server at this URL:'
'\r\n\r\nhttp://%s/.well-known/acme-challenge/([^\r]+)\r')
res = cb.expect([long_match % (fqdn,), pexpect.TIMEOUT, pexpect.EOF])
if res > 0:
sys.exit('Timed out')
filename = cb.match.group(1)
log.info(f'filename of secret: {filename}')
res = cb.expect(['Press Enter to Continue', pexpect.EOF], timeout=0)
data_file = challenge_path / pathlib.Path(filename)
try:
with open(data_file, 'w') as f:
f.write(data)
except:
sys.exit(f'Failed to write {data_file}')
log.info('created secret file with secret data')
data_file.chmod(0o644)
log.info('and chmodded')
symlink_name = pathlib.Path(f'{service}-le')
log.info(f'nginx symlink: {symlink_name}')
nx_conf = pathlib.Path('/etc/nginx')
avail_path = nx_conf / pathlib.Path('sites-available')
enabled_path = nx_conf / pathlib.Path('sites-enabled')
service_available_file = avail_path / symlink_name
log.info(f'nginx service avail symlink: {service_available_file}')
service_enabled_symlink = enabled_path / symlink_name
log.info(f'nginx service enabled symlink: {service_enabled_symlink}')
if not service_enabled_symlink.is_symlink():
service_enabled_symlink.symlink_to(service_available_file)
log.info('created symlink to enable service')
log.info(f'open port 80 to {service}')
firewall_mod('HTTP_UP', service, decrypt_pp)
restart('nginx')
cb.sendline()
log.info(f'sent <enter> to certbot to continue process')
res = cb.expect([pexpect.EOF])
log.info(f'cerbot completed. final output: {cb.before}')
if 'failed' in cb.before:
sys.exit('Something went wrong')
log.info(f'open port 80 to {service}')
firewall_mod('HTTP_DOWN', service, decrypt_pp)
service_enabled_symlink.unlink()
log.info('removed symlink in nginx to disable HTTP')
restart('nginx')
restart('nginx')
key_path = pathlib.Path('/etc/letsencrypt')
live = key_path / pathlib.Path(f'live/{service}.drheck.dev')
archive = key_path / pathlib.Path(f'archive/{service}.drheck.dev')
live = key_path / pathlib.Path(f'live/{service}.{domain}')
archive = key_path / pathlib.Path(f'archive/{service}.{domain}')
log.info(f'live keypath: {live}')
log.info(f'archive keypath: {archive}')
@ -438,12 +319,36 @@ def main(args):
os.chown(archive, uid, gid)
log.info(f'archive keypath chmodded')
for dirpath, dirnames, filenames in os.walk(archive):
for dirpath, _, filenames in os.walk(archive):
os.chown(dirpath, uid, gid)
for filename in filenames:
os.chown(os.path.join(dirpath, filename), uid, gid)
log.info(f'chmodded new keys from certbot')
def main(args):
# logging.basicConfig(level=os.environ.get("LOGLEVEL", "WARNING"))
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
log.info(f'program start: {sys.argv}')
if len(args) != 1:
sys.exit(f'Give a service to renew: {", ".join(supported_services)} ')
service = args[0]
if service not in supported_services:
sys.exit(f'Give a service to renew: {", ".join(supported_services)} ')
uid = os.getuid()
if uid != 0:
sys.exit('Run as root')
decrypt_pp = get_ssh_pw()
challenge_path = reset_challenge_path()
fqdn = f'{service}.{domain}'
log.info(f'fqdn: {fqdn}')
# Dry run:
# cmd = ['/usr/bin/certbot', '--dry-run', 'certonly', '--manual', '-d', fqdn]
# Real run:
run_cert_bot(fqdn, service, challenge_path, decrypt_pp)
restart(service)

35
update_all Executable file
View File

@ -0,0 +1,35 @@
#!/bin/bash
declare -a services
services+=('git')
services+=('plex')
services+=('jellyfin')
services+=('photoprism')
services+=('nextcloud')
services+=('read')
services+=('www')
services+=('chat')
ts="$(date '+%Y-%m-%d')"
if ! sudo printf 'Sudo successful.\n'; then
printf 'sudo failed.\n' >&2
exit 1
fi
if ! pass show test; then
printf 'failed to access passwords.\n' >&2
exit 1
fi
logdir="/data/logs/ssl/${ts}"
mkdir -p "$logdir"
for service in "${services[@]}"; do
log="${logdir}/${service}.log"
if ! sudo ./ssl-update.py "$service" &> "$log"; then
printf '%s failed. Log: %s\n' "$service" "$log"
else
printf '%s succeeded.\n' "$service"
fi
done