from flask import Flask, redirect, flash, session, g, request, get_flashed_messages from wtforms import Form, BooleanField, StringField, PasswordField, validators, SubmitField, IntegerField, DateTimeField, SelectMultipleField from wtforms.validators import ValidationError from flask_wtf import FlaskForm from flask_bootstrap import Bootstrap import time from datetime import datetime from sqlalchemy import exc, update import werkzeug.exceptions import wtforms from wtforms.fields import EmailField from wtforms.widgets import NumberInput import pprint import hra.config as config import hra.web.html as html import hra.db as db from hra.web import app import hra.web.jinja_mac as jinja_mac from hra.util import hash_passwd import hra.lib as lib @html.WrapAfterBuilder_decorator def BasePage(b, content, head=lambda x:None, limited_size=True, sticky_head=True, empty=False): b.current_tag = html.Tag(b, "html", []) b.root_tag = b.current_tag with b.head(): b.title()("KSP hra") b.meta(name="viewport", content="width=device-width, initial-scale=1.0") b.link(rel="stylesheet", href=app.url_for('static', filename='bootstrap.min.css'), type='text/css', media="all") b.link(rel="stylesheet", href=app.url_for('static', filename='ksp-mhd.css'), type='text/css', media="all") b.link(rel="icon", type="image/png", sizes="16x16", href="static/favicon.ico") b.link(rel="shortcut icon", href=app.url_for('static', filename='img/favicon.ico')) b(head) with b.body() as body: if empty: b(*content) else: with b.header(_class=f"flavor-{config.WEB_FLAVOR}"): with b.div(_class="content content_limited" if limited_size else "content"): with b.a(href="/", title="Na hlavní stránku"): b.img(src=app.url_for('static', filename='hippo.png'), style="width: 60px;height: auto;", alt="KSP") b.h1()("Hra na soustředění KSP") with b.div(id="nav-wrapper", _class="nav-wrapper-sticky" if sticky_head else ""): with b.nav(id="main-menu", _class="content"): for item in g.menu: b.a(href=item.url)(item.name) if g.user: b.a(_class="right", href="/")(f"Přihlášen: {g.user.username}") b.a(_class="right", href="/logout")(f"Odhlásit") else: b.a(_class="right", href="/login")(f"Přihlásit") b.a(_class="right", href="/registration")(f"Registrovat") with b.main(): messages = get_flashed_messages(with_categories=True) if messages: for category, message in messages: if category == "message": category = "warning" b.div(_class=f"alert alert-{category}", role="alert")(message) b(*content) class OptionalIntField(wtforms.IntegerField): widget = NumberInput() def process_formdata(self, valuelist): self.data = None if valuelist: if valuelist[0]: try: self.data = int(valuelist[0]) except ValueError: raise wtforms.ValidationError('Nejedná se o číslo.') def user_link(user): b = html.Builder() with b.line(): if user is None: b("-") else: if g.org: b.a(href=app.url_for(web_org_user.__name__, user_id=user.id))(user.print()) else: b(user.print()) return b.root_tag def ip_link(ip): b = html.Builder() with b.line(): b.a(href=app.url_for(web_org_logs.__name__, ip=ip)+"#main_table")(ip) return b.root_tag def right_for_game(game): if g.org: return True if g.user is None: return False return db.get_session().query(db.Team).filter_by(game_id=game.game_id, user_id=g.user.id).count() > 0 def right_for_team(team): if g.org: return True if g.user is None: return False return team.user_id == g.user.id def right_for_step(game): if right_for_game(game): if g.org: if game.step_mode == db.StepMode.org: return True if game.step_mode == db.StepMode.user: return True return False def game_link(game): b = html.Builder() with b.line(): if game is None: b("-") else: b.a(href=app.url_for(web_game.__name__, game_id=game.game_id))(game.print()) return b.root_tag class RegistrationForm(FlaskForm): username = StringField('Jméno týmu', [validators.Length(min=2, max=25), validators.DataRequired()]) captcha = StringField('Captcha', validators=[validators.DataRequired()]) passwd = PasswordField('Heslo', [ validators.DataRequired(), validators.EqualTo('confirm', message='Passwords must match') ]) confirm = PasswordField('Heslo znovu', validators=[validators.DataRequired()]) submit = SubmitField("Založit") def validate_captcha(form, field): if field.data != config.CAPTCHA: raise ValidationError("Chyba!") class LoginForm(FlaskForm): username = StringField('Jméno týmu', [validators.DataRequired()]) passwd = PasswordField('Heslo', [validators.DataRequired()]) submit = SubmitField("Přihlásit") @app.route("/registration", methods=['GET', 'POST']) def registration(): f = RegistrationForm() if f.validate_on_submit(): try: lib.create_user(f.username.data, f.passwd.data) except lib.UsernameExist: flash("Uživatelské jméno již existuje") else: db.get_session().commit() flash("Přidán nový uživatel.", 'success') return redirect("login") b = BasePage() b(jinja_mac.quick_form(f, form_type='horizontal')) return b.print_file() @app.route("/login", methods=['GET', 'POST']) def login(): f = LoginForm() if f.validate_on_submit(): p_hash=hash_passwd(f.passwd.data) user = db.get_session().query(db.User).filter_by(username=f.username.data).one_or_none() print(user, p_hash) if user and user.passwd == p_hash: session.clear() session['uid'] = user.id flash("Přihlášení hotovo.", 'success') return redirect("/") flash("Chybné jméno nebo heslo.", 'danger') b = BasePage() b(jinja_mac.quick_form(f, form_type='horizontal')) return b.print_file() @app.route("/logout") def logout(): session.clear() return redirect('/') @app.template_filter() def none_as_minus(x): return x if x is not None else '-' @app.template_filter() def round_points(x): if x is None: return None return round(x,3) @app.template_filter() def print_time(t): if t == None: return "-" return ("-" if t<0 else "") + str(abs(t)//1000//60) + ":" + ("00{0:d}".format(abs(t)//1000%60))[-2:] @app.route("/", methods=['GET', 'POST']) def web_index(): b = BasePage() if g.user: teams = db.get_session().query(db.Team).filter_by(user_id=g.user.id).order_by(db.Team.game_id).all() games = [] for t in teams: if len(games) == 0 or games[-1].game_id != t.game_id: games.append(t.game) b.line().h3("Token") b.line().p().b(f"Váš token je: {g.user.token}") b.line().h3("Vaše hry") for game in games: b.line().p(game_link(game)) else: b.line().p().b("Přihlaste se, prosím.") b.h3("Ke stažení") with b.p(): b.p("Klient") with b.ul(): u = app.url_for('static', filename='client.zip', _external=True) b.line().li("Jako zip: ", b._p(b._a(href=u)(u))) with b.line().li("Jednotlivé soubory:"): u = app.url_for('static', filename='client/client.py', _external=True) v = app.url_for('static', filename='client/play.py', _external=True) b.p(b._a(href=u)(u), b._br(), b._a(href=v)(v)) b.line().li("Git: ", b._pre(f"git clone {app.url_for('static', filename='client.git', _external=True)}", style="margin: 0pt 0pt")) return b.print_file() @app.route("/game/", methods=['GET']) def web_game(game_id): ses = db.get_session() game = ses.query(db.Game).filter_by(game_id=game_id).one_or_none() teams = ses.query(db.Team).filter_by(game_id=game_id).order_by(db.Team.team_id).all() moves = ses.query(db.Move).filter_by(game_id=game_id, round=game.current_round).order_by(db.Move.team_id).all() assert len(teams) == len(moves) if game is None: raise werkzeug.exceptions.NotFound() if not right_for_game(game): raise werkzeug.exceptions.Forbidden() b = BasePage() b.h2("Hra ", game.print()) with b.line().p("Aktuální kolo: "): if game.working_on_next_state: b.b()(game.current_round, "++") else: b(game.current_round) with b.p().table(_class="data full"): with b.thead(): b.line().th()("Id") b.line().th()("User") b.line().th()("Bodů") b.line().th()("Akce") for team, move in zip(teams, moves): with b.tr(): b.line().td()(team.team_id) b.line().td()(user_link(team.user),": ", team.name) b.line().td()(move.points) with b.td(): with b.div(_class="btn-group", role="group"): if right_for_team(team): b.a(_class="btn btn-xs btn-primary", href=app.url_for(web_game_view.__name__, game_id=game.game_id, team_id=team.team_id))("Zobrazit hru") b.a(_class="btn btn-xs btn-default", href=app.url_for(web_org_logs.__name__, game_id=game_id, team_id=team.team_id)+"#main_table")("Log") if g.org: b.a(_class="btn btn-xs btn-default", href=app.url_for(web_org_game_userchange.__name__, game_id=game.game_id, team_id=team.team_id))("Změnit uživatele") with b.div(_class="btn-group", role="group"): if right_for_step(game): with b.form(method="POST", _class="btn-group", action=app.url_for(web_game_step.__name__, game_id=game_id)): b.button(_class="btn btn-primary", type="submit", name="su", value="yes")("Krok") if g.org: b.a(_class="btn btn-primary", href=app.url_for(web_org_logs.__name__, game_id=game_id)+"#main_table")("Log") b.a(_class="btn btn-default", href=app.url_for(web_org_game_round_inspect.__name__, game_id=game_id))("Inspekce kola") if g.org: b.h3("Aktuální konfigurace") b.p().pre(pprint.pformat(game.get_configuration())) return b.print_file() @app.route("/game//step", methods=['POST']) def web_game_step(game_id): ses = db.get_session() game = ses.query(db.Game).filter_by(game_id=game_id).one_or_none() if game is None: raise werkzeug.exceptions.NotFound() if not right_for_step(game): raise werkzeug.exceptions.Forbidden() try: lib.game_step(game_id, by_user=True) except lib.DuplicitMakeStep: flash("Duplicitní požadavek na krok", 'danger') except lib.TooEarlyStep: flash("Moc brzy na další požadavek na krok", 'danger') else: flash("Krok proveden", "success") return redirect(app.url_for(web_game.__name__, game_id=game_id)) @app.route("/game//view", methods=['GET']) @app.route("/game//view/", methods=['GET']) def web_game_view(game_id, team_id=None): ses = db.get_session() game = ses.query(db.Game).filter_by(game_id=game_id).one_or_none() teams = ses.query(db.Team).filter_by(game_id=game_id).order_by(db.Team.team_id).all() if game is None: raise werkzeug.exceptions.NotFound() if not right_for_game(game): raise werkzeug.exceptions.Forbidden() if team_id is not None: team = ses.query(db.Team).filter_by(game_id=game_id, team_id=team_id).one_or_none() if game is None: raise werkzeug.exceptions.NotFound() if not right_for_team(team): raise werkzeug.exceptions.Forbidden() else: team = None wl = get_wlogic(game) state = game.current_state() if not hasattr(wl, "view"): raise werkzeug.exceptions.NotFound() if state is None: raise werkzeug.exceptions.NotFound() return wl.view(state, team, teams) @app.route("/org/games") def web_org_games(): games = db.get_session().query(db.Game).order_by(db.Game.game_id).all() b = BasePage() b.h2("Hry") with b.p().table(_class="data full"): with b.thead(): b.line().th("Id: Jméno") b.line().th("Kolo") b.line().th("Akce") for g in games: with b.tr(): b.line().td()(g.print()) with b.line().td(): if g.working_on_next_state: b.b()(g.current_round, "++") else: b(g.current_round) with b.td(): b.a(_class="btn btn-xs btn-primary", href=app.url_for(web_game.__name__, game_id=g.game_id))("Detail") return b.print_file() class GameUserchangeForm(FlaskForm): name = StringField("Jméno hry z pohledu týmu") set_no_user = SubmitField("Bez uživatele") submit_no_change = wtforms.SubmitField("Bez změny", render_kw={"style": "display: none"}) @app.route("/org/game//round_inspect", methods=['GET']) @app.route("/org/game//round_inspect/", methods=['GET']) def web_org_game_round_inspect(game_id, round_id=None): ses = db.get_session() game = ses.query(db.Game).filter_by(game_id=game_id).one_or_none() if round_id is None: round_id = game.current_round teams = ses.query(db.Team).filter_by(game_id=game_id).order_by(db.Team.team_id).all() moves = ses.query(db.Move).filter_by(game_id=game_id, round=round_id).order_by(db.Move.team_id).all() assert len(teams) == len(moves) if game is None: raise werkzeug.exceptions.NotFound() if not right_for_game(game): raise werkzeug.exceptions.Forbidden() b = BasePage() b.h2(f"Inspekce kola {round_id} hry {game.print()}") with b.line().p("Aktuální kolo: "): if game.working_on_next_state: b.b()(game.current_round, "++") else: b(game.current_round) with b.p().table(_class="data full"): with b.thead(): b.line().th()("Id") b.line().th()("User") b.line().th()("Bodů") b.line().th()("Stažení") b.line().th()("Nahrání") b.line().th()("Akce") for team, move in zip(teams, moves): with b.tr(): b.line().td()(team.team_id) b.line().td()(user_link(team.user),": ", team.name) b.line().td()(move.points) b.line().td()(move.reads_count) b.line().td()(f"ok: {move.ok_pushs_count} w: {move.warnings_pushs_count} e: {move.err_pushs_count}") with b.td(): with b.div(_class="btn-group", role="group"): if right_for_team(team): b.a(_class="btn btn-xs btn-primary", href=app.url_for(web_org_logs.__name__, game_id=game_id, team_id=team.team_id, round_id=round_id)+"#main_table")("Log") return b.print_file() @app.route("/org/game//team//change_user", methods=['GET', 'POST']) def web_org_game_userchange(game_id, team_id): ses = db.get_session() game = ses.query(db.Game).filter_by(game_id=game_id).one_or_none() team_to_edit = ses.query(db.Team).filter_by(game_id=game_id, team_id=team_id).one_or_none() teams = ses.query(db.Team).filter_by(game_id=game_id).order_by(db.Team.team_id).all() users = db.get_session().query(db.User).order_by(db.User.id).all() if game is None or team_to_edit is None: raise werkzeug.exceptions.NotFound() teams_by_user = {u.id:[] for u in users} for t in teams: if t.user_id is not None: teams_by_user[t.user_id].append(t) form = GameUserchangeForm(obj=team_to_edit) if form.validate_on_submit(): team_to_edit.name = form.name.data if "submit_no_change" not in request.form: team_to_edit.user_id = None for u in users: if f"set_user_{u.id}" in request.form: team_to_edit.user_id = u.id try: ses.commit() except exc.IntegrityError: flash("Duplicitní přiřazení", 'danger') ses.rollback() else: flash("Uživatel změněn", 'success') return redirect(app.url_for(web_game.__name__, game_id=game_id)) b = BasePage() with b.form(action="", method="POST", _class="form form-horizontal", role="form"): b(form.csrf_token) b(form.submit_no_change) with b.div(_class="form-row"): b(jinja_mac.form_field(form.name, size=8)) with b.p().table(_class="data full"): with b.thead(): b.line().th()("Username") b.line().th()("Přiřazení") b.line().th()("Akce") for u in users: with b.tr(): b.line().td(user_link(u)) with b.td(): if len(teams_by_user[u.id]): with b.ul(): for t in teams_by_user[u.id]: b.li(f"{t.team_id} -> {t.name}") b.line().td().input(_class="btn btn-danger" if u.id == team_to_edit.user_id else "btn btn-primary", _id="set_participation_state", name=f"set_user_{u.id}", type="submit", value="Přiřadit účtu") b(jinja_mac.form_field(form.set_no_user)) return b.print_file() @app.route("/org/users") def web_org_users(): users = db.get_session().query(db.User).order_by(db.User.id).all() teams = db.get_session().query(db.Team).all() games_for_user = {u.id: set() for u in users} for t in teams: if t.user_id is not None: games_for_user[t.user_id].add(t.game) b = BasePage() b.h2("Uživatelé") with b.p().table(_class="data full"): with b.thead(): b.line().th()("Username") b.line().th()("Hry") b.line().th()("Akce") for u in users: with b.tr(): b.line().td()(user_link(u)) with b.td().ul(): for g in games_for_user[u.id]: b.line().li(game_link(g)) with b.td(): with b.div(_class="btn-group", role="group"): b.a(_class="btn btn-xs btn-primary", href=app.url_for(web_org_user.__name__, user_id=u.id))("Detail") return b.print_file() @app.route("/org/user/", methods=['GET', 'POST']) def web_org_user(user_id): user = db.get_session().query(db.User).filter_by(id=user_id).one_or_none() if not user: raise werkzeug.exceptions.NotFound() teams = db.get_session().query(db.Team).filter_by(user_id=user_id).order_by(db.Team.game_id).all() b = BasePage() b.line().h2("Uživatel ", user.print()) with b.div(_class="btn-group", role="group"): with b.form(method="POST", _class="btn-group", action=app.url_for(web_org_user_su.__name__, user_id=user.id)): b.a(_class="btn btn-primary", href=app.url_for(web_org_logs.__name__, user_id=user.id)+"#main_table")("Log") b.button(_class="btn btn-default", type="submit", name="su", value="yes")("Převtělit") b.h3("Týmy") with b.p().table(_class="data full"): with b.thead(): b.line().th()("Hra") b.line().th()("Jméno z pohledu týmu") b.line().th()("Číslo týmu") for team in teams: with b.tr(): b.line().td()(game_link(team.game)) b.line().td()(team.name) b.line().td()(team.team_id) return b.print_file() @app.route("/org/user//su", methods=['POST']) def web_org_user_su(user_id): user = db.get_session().query(db.User).filter_by(id=user_id).one_or_none() session['uid'] = user.id flash("Uživatel vtělen!") return redirect('/') @app.route("/org/log//data", methods=['GET']) def web_org_log_data(log_id): l = db.get_session().query(db.Log).filter_by(log_id=log_id).one_or_none() if l is None: raise werkzeug.exceptions.NotFound() b = BasePage() b.line().h2("Log ", log_id) b.p().pre(pprint.pformat(l.get_data())) return b.print_file() class LogsFindForm(FlaskForm): username = StringField('Jméno uživatele') user_id = OptionalIntField('ID uživatele') game_id = OptionalIntField('ID hry') team_id = OptionalIntField('Tým') round_id = OptionalIntField('Kolo') limit = OptionalIntField('Limit') ip = StringField('IP') status = SelectMultipleField('Status', choices=[("", "*")]+[(x,x) for x in ["ok", "warning", "too_early", "too_late", "error", "http-error"]], render_kw={"size":7}) endpoint = SelectMultipleField('Stránky', choices=db.Endpoint.choices()) submit = SubmitField("Najít") def validate_captcha(form, field): if field.data != config.CAPTCHA: raise ValidationError("Chyba!") @app.route("/org/logs", methods=['GET']) def web_org_logs(): f = LogsFindForm(formdata=request.args, csrf_enabled=False) q = db.get_session().query(db.Log).order_by(db.Log.log_id.desc()) if request.args: f.validate() if f.username.data: q = q.filter(db.User.username.ilike(f"%{f.username.data}%")) if f.user_id.data is not None: q = q.filter_by(user_id=f.user_id.data or None) if f.game_id.data is not None: q = q.filter_by(game_id=f.game_id.data or None) if f.team_id.data is not None: q = q.filter_by(team_id=f.team_id.data or None) if f.round_id.data is not None: q = q.filter_by(round=f.round_id.data) if f.ip.data: q = q.filter(db.Log.source_ip.ilike(f"%{f.ip.data}%")) if f.status.data and "" not in f.status.data: q = q.filter(db.Log.status.in_(f.status.data)) if f.endpoint.data: q = q.filter(db.Log.endpoint.in_(f.endpoint.data)) limit = f.limit.data or 20 logs = q.limit(limit).all() count = q.count() b = BasePage() del f.csrf_token b.line().h2("Logy") b(jinja_mac.quick_form(f, form_type='horizontal', method="GET")) with b.p().table(_class="data full"): with b.thead(id="main_table"): b.line().th("Čas") b.line().th("Uživatel") b.line().th("URL") b.line().th("Hra") b.line().th("Status") b.line().th("Popis") b.line().th("GET") b.line().th("Akce") for l in logs: with b.tr(_class="log_"+l.status): b.line().td(l.time.strftime("%H:%M:%S")) b.line().td(user_link(l.user) if l.user else None, b._br(), ip_link(l.source_ip)) b.line().td(l.endpoint._name_, b._br(), l.url) b.line().td(l.game.print() if l.game else None, b._br(), "tým: ", l.team_id, " kolo: ", l.round) b.line().td(l.status) b.line().td(l.text) with b.line().td(): for k,v in l.get.items(): b(f"{k}: {v}").br() with b.line().td(): with b.div(_class="btn-group", role="group"): if l.data is not None: b.a(href=app.url_for(web_org_log_data.__name__, log_id=l.log_id), _class="btn btn-xs btn-primary")("Data") if count <= limit: b.line().p(f"Toť vše ({count})") else: b.line().p().b(f"Zobrazeno prvních {limit} logů z {count}") return b.print_file() from hra.web.game import get_wlogic, wlogic_by_mode