#!/usr/bin/env python3 import argparse import logging import pexpect import pprint import sys class power(object): def __init__(self): self.strip_handle = None self.usb_port_to_device = { '1': 'pi3battery' } self.usb_device_to_port = self.reverse_usb_ports() self.strip_port_to_device = { 'AA2': '10g_netgear', 'AA6': 'monitor', 'AA16': 'ndanknasty', 'AA17': 'muppet', 'AA19': 'usb_hub', 'AA21': 'ultra_arikui', 'AA22': 'nodegrid', 'AA23': 'nodegrid:PS2', 'AA24': 'office_wap_poe', 'AA25': 'tray_switch', 'AA26': 'den_wap_poe', 'AA27': 'studio_wap_poe', 'AA28': '1g_netgear', 'AA29': 'danknasty', } self.strip_device_to_port = self.reverse_strip_ports() print(self.strip_port_to_device.values()) self.device_types = {'rpm': device for device in self.strip_port_to_device.values()} for i in self.device_types: print(i, self.device_types[i]) sys.exit(1) def list_devices(self): print('Configured devices:') for device in self.strip_device_to_port: print(f' {device}') def list_ports(self): print('Configured ports:') for port in self.strip_port_to_device: print(f' {port}') def strip_command(self, cmd): if not self.strip_handle: self.strip_connect() self.strip_con def process(self, args): if args.debug: logging.basicConfig(level=logging.DEBUG) logging.debug('Logging set to DEBUG') logging.debug(args) if args.on: logging.debug(f'on: {args.on}') self.on(args.on) if args.off: logging.debug(f'off: {args.off}') self.off(args.off) if args.listall: logging.debug('displaying all config') self.list_all() def list_all(self): print() print('Power Strip:') for k, v in self.strip_device_to_port.items(): print(k, ', '.join(v)) print() print('USB Hub:') for k, v in self.usb_device_to_port.items(): print(k, v) def on(self, device_list): devices = device_list.split(',') for device in devices: if device not in self.strip_device_to_port: logging.error(f'"{device}" not a known device.') self.list_devices() sys.exit(1) def off(self, args): sys.exit('bye') devices = device_list.split(',') logging.debug(f'turning off {devices}') def status(self, args): pass def reverse_usb_ports(self): return {device: port for (port, device) in self.usb_port_to_device.items()} def reverse_strip_ports(self): strip_device_to_port = {} for strip_port, device in self.strip_port_to_device.items(): if device not in strip_device_to_port: strip_device_to_port[device] = [] strip_device_to_port[device].append(strip_port) return strip_device_to_port def exp(self, p, results, timeout=30): results[pexpect.TIMEOUT] = 'timeout' results[pexpect.EOF] = 'disconnect' list_res = list(results.keys()) res = p.expect(list_res, timeout=timeout) text_res = results[list_res[res]] if text_res == 'timeout': sys.exit('timeout') elif text_res == 'disconnect': sys.exit('disconnect') return text_res def ssh_connect(self, host): cmd = ['ssh', host] p = pexpect.spawnu(' '.join(cmd)) p.logfile = sys.stderr results = {'Password: ': 'prompt'} res = exp(p, results) p.sendline('admn') results = {'Switched PDU: ': 'prompt'} res = exp(p, results) def main(): parser = argparse.ArgumentParser() actions = parser.add_argument_group('actions') action = actions.add_mutually_exclusive_group(required=True) action.add_argument('--status', metavar='DEVICES', help='get power status of a device', type=str) action.add_argument('--off', metavar='DEVICES', help='turn off device', type=str) action.add_argument('--on', metavar='DEVICES', help='turn on device', type=str) action.add_argument('--reboot', metavar='DEVICES', help='reboot a device', type=str) action.add_argument('--portoff', metavar='PORTS', help='turn off a port', type=str) action.add_argument('--porton', metavar='PORTS', help='turn on port', type=str) action.add_argument('--listdev', help='list devices', action='store_true') action.add_argument('--listport', help='list ports', action='store_true') action.add_argument('--listall', help='show all config', action='store_true') options = parser.add_argument_group('options') options.add_argument('--output', help='choose output: text, json', type=str, choices=['text', 'json']) options.add_argument('--debug', help='turn on debug output', action='store_true') args = parser.parse_args() p = power() p.process(args) if __name__ == '__main__': main()