from swagger_client.api.admin_api import AdminApi from swagger_client.api.user_api import UserApi from swagger_client.api.organization_api import OrganizationApi from swagger_client.api.miscellaneous_api import MiscellaneousApi from swagger_client.api_client import ApiClient from swagger_client.models import CreateUserOption, Team, User, Organization from swagger_client.rest import ApiException from sys import stderr import click import termcolor import requests from bs4 import BeautifulSoup as bs import random import string # load configuration try: import config except ImportError: import config_default as config HOST = config.HOST ADMIN_TOKEN = config.ADMIN_TOKEN def rand_password(n: int) -> str: """ Vraci string, ktery splnuje pozadavky Gitey na slozitost hesel a pri tom je dostatecne nahodny. @param n Delka nahodne casti retezce """ req_compliance_str = 'Aa1!@' rnd_str = ''.join([random.choice(string.ascii_letters) for _ in range(n)]) return req_compliance_str + rnd_str def text_red(text) -> str: return termcolor.colored(text, 'red') def text_green(text) -> str: return termcolor.colored(text, 'green') def text_orange(text) -> str: return termcolor.colored(text, 'yellow') def is_server_accesible(misc_api: MiscellaneousApi) -> bool: try: _ = misc_api.get_version() return True except: return False def does_user_exist(user_api: UserApi, username: str) -> bool: try: user = user_api.user_get(username) return user is not None except ApiException as e: if e.status == 404: # uzivatel nebyl nalezen return False else: raise e def does_organization_exist(org_api: OrganizationApi, orgname: str) -> bool: try: org = org_api.org_get(orgname) return org is not None except ApiException as e: if e.status == 404: # organizace nebyla nalezena return False else: raise e def does_organization_have_team_with_name(org_api: OrganizationApi, orgname: str, teamname: str) -> bool: try: get_team_id_by_name(org_api, orgname, teamname) return True except AssertionError: return False def get_team_id_by_name(org_api: OrganizationApi, orgname: str, teamname: str) -> int: """Vraci bud ID teamu, nebo hazi AssertionError, pokud team neexistuje.""" teams = org_api.org_list_teams(orgname) for t in teams: if teamname == t.name: return t.id raise AssertionError("Team se jmenem '{}' v organizaci '{}' neexistuje!".format(teamname, orgname)) def validate(func_check, message: str): print(message, end="") try: if func_check(): print(text_green(' [OK]')) return except ApiException as e: print(text_red(' [FAIL]')) print(text_red(e.body)) print("Request se nepodaril! Duvod:", e.reason, file=stderr) exit(1) print(text_red(' [FAIL]')) print("Neocekavany stav systemu!", file=stderr) exit(1) def checked_api_action(func_action, message: str): print(message, end=' ') try: res = func_action() print(text_green('[OK]')) return res except ApiException as e: print(text_red('[FAIL]')) print(text_red('Server odpovedel:')) print(text_red(e.body)) exit(1) def reset_password(email: str): url = HOST.replace('api/v1', 'user/forgot_password') with requests.session() as session: # ziskani formulare resp = session.get(url) # vyplneni formulare soup = bs(resp.text, features='lxml') form = {n['name']: n['value'] for n in soup.find_all('input')} form['email'] = email # submit form resp = session.post(url, data=form) # simulate being standard api call if resp.status_code != 200: raise ApiException(status=resp.status_code, reason=resp.reason, http_resp='Form submission failed.') @click.command() @click.option('-a', '--admin-token', help='Admin access token') @click.option('-h', '--host', help='Gitea URL (default: {})'.format(HOST)) @click.option('-e', '--email', help='Emailova adresa noveho uzivatele.', required=True) @click.option('-u', '--username', help='Uzivatelske jmeno noveho uzivatele.', required=True) @click.option('-f', '--fullname', help='Plne jmeno noveho uzivatele.') @click.option('-s', '--seminar', help='Seminar, kde uzivatel orguje.', required=True) def register(username: str, seminar: str, fullname: str, email: str, host, admin_token): """ Nastroj pro automatizaci zakladani uctu na serveru Gitea. CO SKRIPT DELA: Prvni probehne kontrola stavu systemu, zda je mozne uzivatele bezpecne zalozit. Pote se provede pokus o zalozeni nasledovan pridanim uzivatele do skupiny 'org' v pozadovane organizaci (jmeno dle seminare). Vytvoreny uzivatel ma nastavene nahodne heslo. To je automaticky resetovano a o tom je odeslan email. KONFIGURACE: V souboru config.py je mozne nastavit pristupovy token a URL perzistentne. Pristupovy token musi byt od uzivatele, ktery je ve skupine 'Owners' v pozadovanem seminari. Jinak nema dostatecna prava na pridavani uzivatelu. """ global HOST HOST = host if host is not None else HOST global ADMIN_TOKEN ADMIN_TOKEN = admin_token if admin_token is not None else ADMIN_TOKEN api_client = ApiClient() api_client.configuration.host = HOST api_client.configuration.api_key = {'token': ADMIN_TOKEN} admin_api = AdminApi(api_client) user_api = UserApi(api_client) org_api = OrganizationApi(api_client) misc_api = MiscellaneousApi(api_client) # kontrola predpokladu o stavu systemu validate(lambda: is_server_accesible(misc_api), "Server je dostupny...") validate(lambda: not does_user_exist(user_api, username), "Uzivatelske jmeno je volne...") validate(lambda: does_organization_exist(org_api, seminar), "Organizace pro seminar existuje...") validate(lambda: does_organization_have_team_with_name(org_api, seminar, 'org'), "V danem seminari existuje team 'org'...") print(text_green('Zakladni predpoklady pro uspesne zalozeni uctu splneny!\n')) # nasbirame vsechny potrebne informace org_team_id = get_team_id_by_name(org_api, seminar, 'org') # pripravime si uzivatele na zalozeni create_user_req = CreateUserOption(email=email, full_name=fullname, username=username, password=rand_password(20)) # provedeme zmeny v systemu user = checked_api_action(lambda: admin_api.admin_create_user(body=create_user_req), "Zakladam noveho uzivatele...") checked_api_action(lambda: org_api.org_add_team_member(org_team_id, username), "Pridavam do teamu 'org' v seminari {}...".format(seminar)) print(text_green('Uzivatel zalozen!')) print() checked_api_action(lambda: reset_password(email), "Resetuji heslo...") if __name__ == '__main__': register()