You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
551 lines
20 KiB
551 lines
20 KiB
from flask import Flask, redirect, flash, session, g, request, get_flashed_messages
|
|
from wtforms import Form, BooleanField, StringField, PasswordField, validators, SubmitField, IntegerField, DateTimeField
|
|
from wtforms.validators import ValidationError
|
|
from flask_wtf import FlaskForm
|
|
from flask_bootstrap import Bootstrap
|
|
import time
|
|
from datetime import datetime
|
|
from sqlalchemy import exc, update
|
|
import werkzeug.exceptions
|
|
import wtforms
|
|
from wtforms.fields import EmailField
|
|
from wtforms.widgets import NumberInput
|
|
import pprint
|
|
|
|
import hra.config as config
|
|
import hra.web.html as html
|
|
import hra.db as db
|
|
from hra.web import app
|
|
import hra.web.jinja_mac as jinja_mac
|
|
from hra.util import hash_passwd
|
|
import hra.lib as lib
|
|
|
|
|
|
@html.WrapAfterBuilder_decorator
|
|
def BasePage(b, content, head=lambda x:None, limited_size=True, sticky_head=True):
|
|
b.current_tag = html.Tag(b, "html", [])
|
|
b.root_tag = b.current_tag
|
|
with b.head():
|
|
b.title()("KSP hra")
|
|
b.meta(name="viewport", content="width=device-width, initial-scale=1.0")
|
|
b.link(rel="stylesheet", href=app.url_for('static', filename='bootstrap.min.css'), type='text/css', media="all")
|
|
b.link(rel="stylesheet", href=app.url_for('static', filename='ksp-mhd.css'), type='text/css', media="all")
|
|
b.link(rel="icon", type="image/png", sizes="16x16", href="static/favicon.ico")
|
|
b.link(rel="shortcut icon", href=app.url_for('static', filename='img/favicon.ico'))
|
|
b(head)
|
|
with b.body() as body:
|
|
with b.header(_class=f"flavor-{config.WEB_FLAVOR}"):
|
|
with b.div(_class="content content_limited" if limited_size else "content"):
|
|
with b.a(href="/", title="Na hlavní stránku"):
|
|
b.img(src=app.url_for('static', filename='hippo.png'), style="width: 60px;height: auto;", alt="KSP")
|
|
b.h1()("Hra na soustředění KSP")
|
|
with b.div(id="nav-wrapper", _class="nav-wrapper-sticky" if sticky_head else ""):
|
|
with b.nav(id="main-menu", _class="content"):
|
|
for item in g.menu:
|
|
b.a(href=item.url)(item.name)
|
|
if g.user:
|
|
b.a(_class="right", href="/")(f"Přihlášen: {g.user.username}")
|
|
b.a(_class="right", href="/logout")(f"Odhlásit")
|
|
else:
|
|
b.a(_class="right", href="/login")(f"Přihlásit")
|
|
b.a(_class="right", href="/registration")(f"Registrovat")
|
|
with b.main():
|
|
messages = get_flashed_messages(with_categories=True)
|
|
if messages:
|
|
for category, message in messages:
|
|
if category == "message":
|
|
category = "warning"
|
|
b.div(_class=f"alert alert-{category}", role="alert")(message)
|
|
b(*content)
|
|
|
|
|
|
class OptionalIntField(wtforms.IntegerField):
|
|
widget = NumberInput()
|
|
|
|
def process_formdata(self, valuelist):
|
|
self.data = None
|
|
if valuelist:
|
|
if valuelist[0]:
|
|
try:
|
|
self.data = int(valuelist[0])
|
|
except ValueError:
|
|
raise wtforms.ValidationError('Nejedná se o číslo.')
|
|
|
|
def user_link(user):
|
|
b = html.Builder()
|
|
with b.line():
|
|
if user is None:
|
|
b("-")
|
|
else:
|
|
if g.org:
|
|
b.a(href=app.url_for(web_org_user.__name__, user_id=user.id))(user.print())
|
|
else:
|
|
b(user.print())
|
|
return b.root_tag
|
|
|
|
|
|
def right_for_game(game):
|
|
if g.org:
|
|
return True
|
|
if g.user is None:
|
|
return False
|
|
return db.get_session().query(db.Team).filter_by(game_id=game.game_id, user_id=g.user.id).count() > 0
|
|
|
|
|
|
def right_for_team(team):
|
|
if g.org:
|
|
return True
|
|
if g.user is None:
|
|
return False
|
|
return team.user_id == g.user.id
|
|
|
|
|
|
def right_for_step(game):
|
|
if right_for_game(game):
|
|
if g.org:
|
|
if game.step_mode == db.StepMode.org:
|
|
return True
|
|
if game.step_mode == db.StepMode.user:
|
|
return True
|
|
return False
|
|
|
|
|
|
|
|
def game_link(game):
|
|
b = html.Builder()
|
|
with b.line():
|
|
if game is None:
|
|
b("-")
|
|
else:
|
|
b.a(href=app.url_for(web_game.__name__, game_id=game.game_id))(game.print())
|
|
return b.root_tag
|
|
|
|
|
|
class RegistrationForm(FlaskForm):
|
|
username = StringField('Jméno týmu', [validators.Length(min=2, max=25), validators.DataRequired()])
|
|
captcha = StringField('Captcha', validators=[validators.DataRequired()])
|
|
passwd = PasswordField('Heslo', [
|
|
validators.DataRequired(),
|
|
validators.EqualTo('confirm', message='Passwords must match')
|
|
])
|
|
confirm = PasswordField('Heslo znovu', validators=[validators.DataRequired()])
|
|
submit = SubmitField("Založit")
|
|
|
|
def validate_captcha(form, field):
|
|
if field.data != config.CAPTCHA:
|
|
raise ValidationError("Chyba!")
|
|
|
|
|
|
class LoginForm(FlaskForm):
|
|
username = StringField('Jméno týmu', [validators.DataRequired()])
|
|
passwd = PasswordField('Heslo', [validators.DataRequired()])
|
|
submit = SubmitField("Přihlásit")
|
|
|
|
|
|
|
|
@app.route("/registration", methods=['GET', 'POST'])
|
|
def registration():
|
|
f = RegistrationForm()
|
|
if f.validate_on_submit():
|
|
try:
|
|
lib.create_user(f.username.data, f.passwd.data)
|
|
except lib.UsernameExist:
|
|
flash("Uživatelské jméno již existuje")
|
|
else:
|
|
db.get_session().commit()
|
|
flash("Přidán nový uživatel.", 'success')
|
|
return redirect("login")
|
|
b = BasePage()
|
|
b(jinja_mac.quick_form(f, form_type='horizontal'))
|
|
return b.print_file()
|
|
|
|
@app.route("/login", methods=['GET', 'POST'])
|
|
def login():
|
|
f = LoginForm()
|
|
if f.validate_on_submit():
|
|
p_hash=hash_passwd(f.passwd.data)
|
|
user = db.get_session().query(db.User).filter_by(username=f.username.data).one_or_none()
|
|
print(user, p_hash)
|
|
if user and user.passwd == p_hash:
|
|
session.clear()
|
|
session['uid'] = user.id
|
|
flash("Přihlášení hotovo.", 'success')
|
|
return redirect("/")
|
|
flash("Chybné jméno nebo heslo.", 'danger')
|
|
|
|
b = BasePage()
|
|
b(jinja_mac.quick_form(f, form_type='horizontal'))
|
|
return b.print_file()
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session.clear()
|
|
return redirect('/')
|
|
|
|
|
|
@app.template_filter()
|
|
def none_as_minus(x):
|
|
return x if x is not None else '-'
|
|
|
|
|
|
@app.template_filter()
|
|
def round_points(x):
|
|
if x is None:
|
|
return None
|
|
return round(x,3)
|
|
|
|
|
|
@app.template_filter()
|
|
def print_time(t):
|
|
if t == None:
|
|
return "-"
|
|
return ("-" if t<0 else "") + str(abs(t)//1000//60) + ":" + ("00{0:d}".format(abs(t)//1000%60))[-2:]
|
|
|
|
|
|
|
|
|
|
@app.route("/", methods=['GET', 'POST'])
|
|
def web_index():
|
|
b = BasePage()
|
|
|
|
if g.user:
|
|
teams = db.get_session().query(db.Team).filter_by(user_id=g.user.id).order_by(db.Team.game_id).all()
|
|
games = []
|
|
for t in teams:
|
|
if len(games) == 0 or games[-1].game_id != t.game_id:
|
|
games.append(t.game)
|
|
|
|
b.line().h3("Token")
|
|
b.line().p().b(f"Váš token je: {g.user.token}")
|
|
b.line().h3("Vaše hry")
|
|
for game in games:
|
|
b.line().p(game_link(game))
|
|
else:
|
|
b.line().p("Přihlaste se, prosím.")
|
|
|
|
return b.print_file()
|
|
|
|
@app.route("/game/<int:game_id>", methods=['GET'])
|
|
def web_game(game_id):
|
|
ses = db.get_session()
|
|
game = ses.query(db.Game).filter_by(game_id=game_id).one_or_none()
|
|
teams = ses.query(db.Team).filter_by(game_id=game_id).order_by(db.Team.team_id).all()
|
|
moves = ses.query(db.Move).filter_by(game_id=game_id, round=game.current_round).order_by(db.Move.team_id).all()
|
|
assert len(teams) == len(moves)
|
|
|
|
if game is None:
|
|
raise werkzeug.exceptions.NotFound()
|
|
if not right_for_game(game):
|
|
raise werkzeug.exceptions.Forbidden()
|
|
|
|
b = BasePage()
|
|
b.h2("Hra ", game.print())
|
|
with b.line().p("Aktuální kolo: "):
|
|
if game.working_on_next_state:
|
|
b.b()(game.current_round, "++")
|
|
else:
|
|
b(game.current_round)
|
|
with b.p().table(_class="data full"):
|
|
with b.thead():
|
|
b.line().th()("Id")
|
|
b.line().th()("User")
|
|
b.line().th()("Bodů")
|
|
b.line().th()("Akce")
|
|
for team, move in zip(teams, moves):
|
|
with b.tr():
|
|
b.line().td()(team.team_id)
|
|
b.line().td()(user_link(team.user),": ", team.name)
|
|
b.line().td()(move.points)
|
|
with b.td():
|
|
with b.div(_class="btn-group", role="group"):
|
|
if right_for_team(team):
|
|
b.a(_class="btn btn-xs btn-primary", href=app.url_for(web_game_view.__name__, game_id=game.game_id, team_id=team.team_id))("Zobrazit hru")
|
|
if g.org:
|
|
b.a(_class="btn btn-xs btn-default", href=app.url_for(web_org_game_userchange.__name__, game_id=game.game_id, team_id=team.team_id))("Změnit uživatele")
|
|
|
|
with b.div(_class="btn-gruser_idoup", role="group"):
|
|
if right_for_step(game):
|
|
with b.form(method="POST", _class="btn-group", action=app.url_for(web_game_step.__name__, game_id=game_id)):
|
|
b.button(_class="btn btn-primary", type="submit", name="su", value="yes")("Krok")
|
|
if g.org:
|
|
b.a(_class="btn btn-default", href=app.url_for(web_org_game_round_inspect.__name__, game_id=game_id))("Inspekce kola")
|
|
|
|
if g.org:
|
|
b.h3("Aktuální konfigurace")
|
|
b.p().pre(pprint.pformat(game.get_configuration()))
|
|
return b.print_file()
|
|
|
|
@app.route("/game/<int:game_id>/step", methods=['POST'])
|
|
def web_game_step(game_id):
|
|
ses = db.get_session()
|
|
game = ses.query(db.Game).filter_by(game_id=game_id).one_or_none()
|
|
if game is None:
|
|
raise werkzeug.exceptions.NotFound()
|
|
if not right_for_step(game):
|
|
raise werkzeug.exceptions.Forbidden()
|
|
|
|
lib.game_step(game_id)
|
|
|
|
return redirect(app.url_for(web_game.__name__, game_id=game_id))
|
|
|
|
@app.route("/game/<int:game_id>/view/<int:team_id>", methods=['GET'])
|
|
def web_game_view(game_id, team_id=None):
|
|
ses = db.get_session()
|
|
game = ses.query(db.Game).filter_by(game_id=game_id).one_or_none()
|
|
teams = ses.query(db.Team).filter_by(game_id=game_id).order_by(db.Team.team_id).all()
|
|
if game is None:
|
|
raise werkzeug.exceptions.NotFound()
|
|
if not right_for_game(game):
|
|
raise werkzeug.exceptions.Forbidden()
|
|
if team_id is not None:
|
|
team = ses.query(db.Team).filter_by(game_id=game_id, team_id=team_id).one_or_none()
|
|
if game is None:
|
|
raise werkzeug.exceptions.NotFound()
|
|
if not right_for_team(team):
|
|
raise werkzeug.exceptions.Forbidden()
|
|
else:
|
|
team = None
|
|
|
|
wl = get_wlogic(game)
|
|
state = game.current_state()
|
|
|
|
if not hasattr(wl, "view"):
|
|
raise werkzeug.exceptions.NotFound()
|
|
|
|
if state is None:
|
|
raise werkzeug.exceptions.NotFound()
|
|
|
|
return wl.view(state, team, teams)
|
|
|
|
|
|
|
|
|
|
@app.route("/org/games")
|
|
def web_org_games():
|
|
games = db.get_session().query(db.Game).order_by(db.Game.game_id).all()
|
|
b = BasePage()
|
|
b.h2("Hry")
|
|
with b.p().table(_class="data full"):
|
|
with b.thead():
|
|
b.line().th("Id: Jméno")
|
|
b.line().th("Kolo")
|
|
b.line().th("Akce")
|
|
for g in games:
|
|
with b.tr():
|
|
b.line().td()(g.print())
|
|
with b.line().td():
|
|
if g.working_on_next_state:
|
|
b.b()(g.current_round, "++")
|
|
else:
|
|
b(g.current_round)
|
|
with b.td():
|
|
b.a(_class="btn btn-xs btn-primary", href=app.url_for(web_game.__name__, game_id=g.game_id))("Detail")
|
|
return b.print_file()
|
|
|
|
class GameUserchangeForm(FlaskForm):
|
|
name = StringField("Jméno hry z pohledu týmu")
|
|
set_no_user = SubmitField("Bez uživatele")
|
|
submit_no_change = wtforms.SubmitField("Bez změny", render_kw={"style": "display: none"})
|
|
|
|
|
|
@app.route("/org/game/<int:game_id>/round_inspect", methods=['GET'])
|
|
@app.route("/org/game/<int:game_id>/round_inspect/<int:round_id>", methods=['GET'])
|
|
def web_org_game_round_inspect(game_id, round_id=None):
|
|
ses = db.get_session()
|
|
game = ses.query(db.Game).filter_by(game_id=game_id).one_or_none()
|
|
if round_id is None:
|
|
round_id = game.current_round
|
|
teams = ses.query(db.Team).filter_by(game_id=game_id).order_by(db.Team.team_id).all()
|
|
moves = ses.query(db.Move).filter_by(game_id=game_id, round=round_id).order_by(db.Move.team_id).all()
|
|
assert len(teams) == len(moves)
|
|
|
|
if game is None:
|
|
raise werkzeug.exceptions.NotFound()
|
|
if not right_for_game(game):
|
|
raise werkzeug.exceptions.Forbidden()
|
|
|
|
b = BasePage()
|
|
b.h2(f"Inspekce kola {round_id} hry {game.print()}")
|
|
with b.line().p("Aktuální kolo: "):
|
|
if game.working_on_next_state:
|
|
b.b()(game.current_round, "++")
|
|
else:
|
|
b(game.current_round)
|
|
with b.p().table(_class="data full"):
|
|
with b.thead():
|
|
b.line().th()("Id")
|
|
b.line().th()("User")
|
|
b.line().th()("Bodů")
|
|
b.line().th()("Stažení")
|
|
b.line().th()("Nahrání")
|
|
for team, move in zip(teams, moves):
|
|
with b.tr():
|
|
b.line().td()(team.team_id)
|
|
b.line().td()(user_link(team.user),": ", team.name)
|
|
b.line().td()(move.points)
|
|
b.line().td()(move.reads_count)
|
|
b.line().td()(f"{move.ok_pushs_count} {move.warnings_pushs_count} {move.err_pushs_count}")
|
|
return b.print_file()
|
|
|
|
|
|
@app.route("/org/game/<int:game_id>/team/<int:team_id>/change_user", methods=['GET', 'POST'])
|
|
def web_org_game_userchange(game_id, team_id):
|
|
ses = db.get_session()
|
|
game = ses.query(db.Game).filter_by(game_id=game_id).one_or_none()
|
|
team_to_edit = ses.query(db.Team).filter_by(game_id=game_id, team_id=team_id).one_or_none()
|
|
teams = ses.query(db.Team).filter_by(game_id=game_id).order_by(db.Team.team_id).all()
|
|
users = db.get_session().query(db.User).order_by(db.User.id).all()
|
|
if game is None or team_to_edit is None:
|
|
raise werkzeug.exceptions.NotFound()
|
|
teams_by_user = {u.id:[] for u in users}
|
|
for t in teams:
|
|
if t.user_id is not None:
|
|
teams_by_user[t.user_id].append(t)
|
|
|
|
form = GameUserchangeForm(obj=team_to_edit)
|
|
if form.validate_on_submit():
|
|
team_to_edit.name = form.name.data
|
|
if "submit_no_change" not in request.form:
|
|
team_to_edit.user_id = None
|
|
for u in users:
|
|
if f"set_user_{u.id}" in request.form:
|
|
team_to_edit.user_id = u.id
|
|
try:
|
|
ses.commit()
|
|
except exc.IntegrityError:
|
|
flash("Duplicitní přiřazení", 'danger')
|
|
ses.rollback()
|
|
else:
|
|
flash("Uživatel změněn", 'success')
|
|
return redirect(app.url_for(web_game.__name__, game_id=game_id))
|
|
|
|
b = BasePage()
|
|
with b.form(action="", method="POST", _class="form form-horizontal", role="form"):
|
|
b(form.csrf_token)
|
|
b(form.submit_no_change)
|
|
with b.div(_class="form-row"):
|
|
b(jinja_mac.form_field(form.name, size=8))
|
|
with b.p().table(_class="data full"):
|
|
with b.thead():
|
|
b.line().th()("Username")
|
|
b.line().th()("Přiřazení")
|
|
b.line().th()("Akce")
|
|
for u in users:
|
|
with b.tr():
|
|
b.line().td(user_link(u))
|
|
with b.td():
|
|
if len(teams_by_user[u.id]):
|
|
with b.ul():
|
|
for t in teams_by_user[u.id]:
|
|
b.li(f"{t.team_id} -> {t.name}")
|
|
b.line().td().input(_class="btn btn-danger" if u.id == team_to_edit.user_id else "btn btn-primary", _id="set_participation_state", name=f"set_user_{u.id}", type="submit", value="Přiřadit účtu")
|
|
b(jinja_mac.form_field(form.set_no_user))
|
|
return b.print_file()
|
|
|
|
|
|
|
|
|
|
@app.route("/org/users")
|
|
def web_org_users():
|
|
users = db.get_session().query(db.User).order_by(db.User.id).all()
|
|
teams = db.get_session().query(db.Team).all()
|
|
games_for_user = {u.id: set() for u in users}
|
|
for t in teams:
|
|
if t.user_id is not None:
|
|
games_for_user[t.user_id].add(t.game)
|
|
b = BasePage()
|
|
b.h2("Uživatelé")
|
|
with b.p().table(_class="data full"):
|
|
with b.thead():
|
|
b.line().th()("Username")
|
|
b.line().th()("Hry")
|
|
b.line().th()("Akce")
|
|
for u in users:
|
|
with b.tr():
|
|
b.line().td()(user_link(u))
|
|
with b.td().ul():
|
|
for g in games_for_user[u.id]:
|
|
b.line().li(game_link(g))
|
|
with b.td():
|
|
with b.div(_class="btn-group", role="group"):
|
|
b.a(_class="btn btn-xs btn-primary", href=app.url_for(web_org_user.__name__, user_id=u.id))("Detail")
|
|
return b.print_file()
|
|
|
|
@app.route("/org/user/<int:user_id>", methods=['GET', 'POST'])
|
|
def web_org_user(user_id):
|
|
user = db.get_session().query(db.User).filter_by(id=user_id).one_or_none()
|
|
if not user:
|
|
raise werkzeug.exceptions.NotFound()
|
|
teams = db.get_session().query(db.Team).filter_by(user_id=user_id).order_by(db.Team.game_id).all()
|
|
b = BasePage()
|
|
b.line().h2("Uživatel ", user.print())
|
|
with b.div(_class="btn-group", role="group"):
|
|
with b.form(method="POST", _class="btn-group", action=app.url_for(web_org_user_su.__name__, user_id=user.id)):
|
|
b.button(_class="btn btn-default", type="submit", name="su", value="yes")("Převtělit")
|
|
b.h3("Týmy")
|
|
with b.p().table(_class="data full"):
|
|
with b.thead():
|
|
b.line().th()("Hra")
|
|
b.line().th()("Jméno z pohledu týmu")
|
|
b.line().th()("Číslo týmu")
|
|
for team in teams:
|
|
with b.tr():
|
|
b.line().td()(game_link(team.game))
|
|
b.line().td()(team.name)
|
|
b.line().td()(team.team_id)
|
|
return b.print_file()
|
|
|
|
@app.route("/org/user/<int:user_id>/su", methods=['POST'])
|
|
def web_org_user_su(user_id):
|
|
user = db.get_session().query(db.User).filter_by(id=user_id).one_or_none()
|
|
session['uid'] = user.id
|
|
flash("Uživatel vtělen!")
|
|
return redirect('/')
|
|
|
|
@app.route("/org/log/<int:log_id>", methods=['GET'])
|
|
def web_org_log(log_id):
|
|
l = db.get_session().query(db.Log).filter_by(log_id=log_id).one_or_none()
|
|
if l is None:
|
|
raise werkzeug.exceptions.NotFound()
|
|
b = BasePage()
|
|
b.line().h2("Log ", log_id)
|
|
b.p().pre(pprint.pformat(l.get_data()))
|
|
return b.print_file()
|
|
|
|
|
|
@app.route("/org/logs", methods=['GET'])
|
|
def web_org_logs():
|
|
logs = db.get_session().query(db.Log).order_by(db.Log.log_id.desc()).all()
|
|
b = BasePage()
|
|
|
|
b.line().h2("Logy")
|
|
with b.p().table(_class="data full"):
|
|
with b.thead():
|
|
b.line().th()("Čas")
|
|
b.line().th()("Uživatel")
|
|
b.line().th()("URL")
|
|
b.line().th()("Hra")
|
|
b.line().th()("Tým")
|
|
b.line().th()("Status")
|
|
b.line().th()("Popis")
|
|
b.line().th()("GET")
|
|
b.line().th()("Akce")
|
|
for l in logs:
|
|
with b.tr():
|
|
b.line().td(l.time.strftime("%H:%M:%S"))
|
|
b.line().td(l.user.print() if l.user else None, b._br(), l.source_ip)
|
|
b.line().td(l.endpoint._name_, b._br(), l.url)
|
|
b.line().td(l.game.print() if l.game else None)
|
|
b.line().td(l.team_id)
|
|
b.line().td(l.status)
|
|
b.line().td(l.text)
|
|
with b.line().td():
|
|
for k,v in l.get.items():
|
|
b(f"{k}: {v}").br()
|
|
with b.line().td():
|
|
with b.div(_class="btn-group", role="group"):
|
|
b.a(href=app.url_for(web_org_log.__name__, log_id=l.log_id), _class="btn btn-xs btn-primary")("Podrobnosti")
|
|
return b.print_file()
|
|
|
|
from hra.web.game import get_wlogic, wlogic_by_mode
|
|
|