You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
143 lines
4.2 KiB
143 lines
4.2 KiB
import json
|
|
import hra.config as config
|
|
import hra.db as db
|
|
from hra.util import hash_passwd
|
|
from datetime import datetime
|
|
from sqlalchemy import exc, update
|
|
|
|
class DuplicitMakeStep(Exception):
|
|
pass
|
|
class TooEarlyStep(Exception):
|
|
pass
|
|
|
|
def game_step(game_id: int, by_user: bool = False):
|
|
ses = db.get_session()
|
|
|
|
ses.expire_all()
|
|
game = ses.query(db.Game).filter_by(game_id=game_id).with_for_update().one_or_none()
|
|
assert game is not None
|
|
time = datetime.now()
|
|
if game.working_on_next_state:
|
|
ses.commit()
|
|
raise DuplicitMakeStep()
|
|
|
|
old_round_id = game.current_round
|
|
new_round_id = old_round_id + 1
|
|
old_state = ses.query(db.State).filter_by(game_id=game.game_id, round=old_round_id).one_or_none()
|
|
|
|
if by_user:
|
|
if (datetime.now() - old_state.create_time).total_seconds() < 5:
|
|
raise TooEarlyStep()
|
|
game.working_on_next_state = True
|
|
ses.commit()
|
|
|
|
|
|
moves = [None for _ in range(game.teams_count)]
|
|
points = [0 for _ in range(game.teams_count)]
|
|
for i in ses.query(db.Move).filter_by(game_id=game.game_id, round=old_round_id).all():
|
|
moves[i.team_id] = i.get_move()
|
|
points[i.team_id] = i.points
|
|
|
|
ses.commit()
|
|
|
|
x, add_points = game.get_logic().step(old_state.get_state(), moves, old_round_id)
|
|
|
|
points = [a+b for a,b in zip(points, add_points)]
|
|
|
|
new_state = db.State(game_id=game.game_id, round=new_round_id, state=db.new_big_data(x), create_time=time)
|
|
ses.add(new_state)
|
|
|
|
for i in range(game.teams_count):
|
|
db_move = db.Move(team_id=i, game_id=game_id, round=new_round_id, points=points[i])
|
|
ses.add(db_move)
|
|
|
|
ses.expire_all()
|
|
game = ses.query(db.Game).filter_by(game_id=game_id).with_for_update().one_or_none()
|
|
assert game is not None
|
|
assert game.working_on_next_state
|
|
game.current_round = new_round_id
|
|
game.working_on_next_state = False
|
|
ses.commit()
|
|
|
|
def game_restore_broken(game_id: int) -> None:
|
|
ses = db.get_session()
|
|
ses.expire_all()
|
|
game = ses.query(db.Game).filter_by(game_id=game_id).with_for_update().one_or_none()
|
|
game.working_on_next_state = False
|
|
ses.commit()
|
|
|
|
|
|
def create_game(mode, teams_count, configuration={}, test_for=None, name=None, step_mode=db.StepMode.none):
|
|
ses = db.get_session()
|
|
|
|
g = db.Game(game_mode=mode, configuration=db.new_big_data(configuration), teams_count=teams_count, name=name, step_mode=step_mode)
|
|
|
|
ses.add(g)
|
|
ses.flush()
|
|
|
|
g.lock()
|
|
|
|
s = db.State(game_id=g.game_id, round=0, state=db.new_big_data(g.get_logic().zero_state()), create_time=datetime.now())
|
|
ses.add(s)
|
|
|
|
if test_for is not None:
|
|
for i in range(teams_count):
|
|
t = db.Team(team_id=i, game_id=g.game_id, name=f"test_{i}", user_id=test_for.id)
|
|
ses.add(t)
|
|
else:
|
|
for i in range(teams_count):
|
|
t = db.Team(team_id=i, game_id=g.game_id, name="")
|
|
ses.add(t)
|
|
|
|
for i in range(teams_count):
|
|
db_move = db.Move(team_id=i, game_id=g.game_id, round=0)
|
|
ses.add(db_move)
|
|
|
|
g.current_round = 0
|
|
g.working_on_next_state = False
|
|
|
|
return g
|
|
|
|
def create_test_game(user):
|
|
mode = "occupy"
|
|
teams_count = 4
|
|
configuration = {
|
|
"teams_width": 2,
|
|
"teams_height": 2,
|
|
"width_per_team": 10,
|
|
"height_per_team": 10,
|
|
"born_per_round": [1],
|
|
"initial_remaining_rounds": 1000,
|
|
"spawn_price": 10,
|
|
"last_spawn": 100,
|
|
"hills": [
|
|
".xx....x..",
|
|
".xxx...xx.",
|
|
"...x......",
|
|
".......xx.",
|
|
".......xx.",
|
|
".......x..",
|
|
"..........",
|
|
"..........",
|
|
"..........",
|
|
"..........",
|
|
],
|
|
}
|
|
return create_game(mode=mode, teams_count=teams_count, configuration=configuration, test_for=user, name=f"Testovací hra uživatele {user.username}", step_mode=db.StepMode.user)
|
|
|
|
class UsernameExist(Exception):
|
|
pass
|
|
|
|
def create_user(username, passwd, org=False, test_game=True):
|
|
u = db.User(org=org, username=username, passwd=hash_passwd(passwd))
|
|
u.gen_token()
|
|
try:
|
|
db.get_session().add(u)
|
|
db.get_session().flush()
|
|
except exc.IntegrityError:
|
|
raise UsernameExist()
|
|
|
|
if test_game:
|
|
create_test_game(u)
|
|
return u
|
|
|
|
|