#!/usr/bin/env python3 import os, sys, argparse, json, datetime, pwd, grp from dataclasses import dataclass from typing import Optional, Any @dataclass(frozen=True, order=True) class UserInfo: uid: int name: str homedir: str def __repr__(self) -> str: return f"{self.name}:{self.uid}" @staticmethod def create(struct: pwd.struct_passwd) -> 'UserInfo': return UserInfo(struct.pw_uid, struct.pw_name, struct.pw_dir) def get_users(users: list[str], groups: list[str], verbose: bool) -> list[UserInfo]: result = [] for user in users: try: if user.isdigit(): result.append(UserInfo.create(pwd.getpwuid(int(user)))) else: result.append(UserInfo.create(pwd.getpwnam(user))) except KeyError: print(f"User {user} does not exist", file=sys.stderr) if verbose: print(f"Added hardcoded users: {result}") for group in groups: try: if group.isdigit(): g = grp.getgrgid(int(group)) else: g = grp.getgrnam(group) except KeyError: print(f"Group {group} does not exist", file=sys.stderr) continue # gp_mem contains only those members which don't have gid == g.gr_gid members = [ UserInfo.create(pwd.getpwnam(member)) for member in g.gr_mem ] members.extend(UserInfo.create(p) for p in pwd.getpwall() if p.pw_gid == g.gr_gid) if verbose: print(f"Added all group '{group}' members: {members}") result.extend(members) return list(sorted(set(result))) def read_authorized_keys(user: UserInfo) -> list[str]: original_uid = os.geteuid() try: os.seteuid(user.uid) keys_file = os.path.join(user.homedir, ".ssh", "authorized_keys") if not os.path.exists(keys_file): return [] with open(keys_file) as f: return f.readlines() finally: os.seteuid(original_uid) def main_args(user_names: list[str], group_names: list[str], output_file: str, verbose: bool): if os.getuid() != 0: print("WARNING: This script should be executed as root", file=sys.stderr) users = get_users(user_names, group_names, verbose) result = [] for user in users: user_row: dict[str, Any] = { "name": user.name, "uid": user.uid, } try: keys = read_authorized_keys(user) user_row["keys"] = keys if verbose: print(f"Read {len(keys)} keys for {user}") except Exception as e: print(f"Error reading keys for {user}: {e}", file=sys.stderr) user_row["failure"] = True result.append(user_row) with open(output_file + ".part", "w") as f: json.dump({ "users": result, "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), }, f, indent=4) os.rename(output_file + ".part", output_file) def main(): parser = argparse.ArgumentParser(description='Collect .ssh/authorized_keys files from a specified list of users') parser.add_argument("--users", nargs="+", default=[], help="Collect keys from these users (username or uid)") parser.add_argument("--groups", nargs="+", default=[], help="Collect keys from all users in these groups (group name or gid)") parser.add_argument("--output", help="Output file to write authorized_keys to", required=True) parser.add_argument("--verbose", action="store_true", help="Print debug messages") args = parser.parse_args() main_args(args.users, args.groups, args.output, args.verbose) main()