Browse Source

Implementace zbytku problémů

To, co zbývá, je označené buď jako TODO, nebo jako FIXME.
middleware_test
Pavel "LEdoian" Turinsky 3 years ago
parent
commit
96404abd20
  1. 121
      db_compare.py

121
db_compare.py

@ -104,7 +104,6 @@ def check_reseni():
same_fields = ['id', 'forma', 'poznamka']
renamed_fields = [('timestamp', 'cas_doruceni'),
# Also moved fields
('problem_id', 'hodnoceni.problem_id'),
('body', 'hodnoceni.body'),
('cislo_body_id', 'hodnoceni.cislo_body_id'),
@ -126,11 +125,11 @@ def check_reseni():
oldcur = oldconn.cursor()
old_results = oldcur.execute(old_query).fetchall()
newcur = newconn.cursor()
new_results = newcur.execute(old_query).fetchall()
new_results = newcur.execute(new_query).fetchall()
for oldr in old_results:
if oldr not in new_results:
raise ValueError(f'Pair {oldr} not found in new db.')
raise ValueError(f'Reseni pair {oldr} not found in new db.')
def check_organizator():
old_query = 'SELECT * FROM seminar_organizatori ORDER BY id'
@ -255,10 +254,106 @@ def check_pohadka():
# Problémy jsou rozdělené podle typů:
def check_problem_common():
query = "SELECT * FROM seminar_problemy ORDER BY id"
same_fields = ['id', 'nazev', 'stav', 'autor', 'kod']
renamed_fields = [
('text_org', 'poznamka'),
('timestamp', 'vytvoreno'),
]
old_fields = same_fields + [f[0] for f in renamed_fields]
new_fields = same_fields + [f[1] for f in renamed_fields]
old_res, new_res = execute_simple(query)
res = zip(old_res,new_res)
for o,n in res:
check_same(o,n, old_fields, new_fields)
# Opravovatelé
old_query = "SELECT id, opravovatel_id FROM seminar_problemy"
new_query = "SELECT problem_id, opravovatel_id FROM seminar_problemy_opravovatele"
# Simple cursors
oldcur = oldconn.cursor()
old_results = oldcur.execute(old_query).fetchall()
newcur = newconn.cursor()
new_results = newcur.execute(new_query).fetchall()
for oldr in old_results:
if oldr not in new_results:
raise ValueError(f'Opravovatel pair {oldr} not found in new db.')
# FIXME: Zaměření?
def check_uloha():
raise NotImplementedError()
old_query = "SELECT * FROM seminar_problemy WHERE typ = 'uloha' ORDER BY id"
new_query = """SELECT cislo_zadani, cislo_reseni, problem_ptr_id, max_body, uzt.na_web AS text_zadani, uvt.na_web AS text_reseni
FROM seminar_ulohy
-- Problém:
JOIN seminar_problemy AS problem ON problem_ptr_id = problem.id
-- Text zadání:
INNER JOIN seminar_nodes_uloha_zadani AS uzn ON id = uzn.uloha_id
INNER JOIN seminar_nodes_treenode AS uztn ON uztn.id = uzn.treenode_ptr_id
INNER JOIN seminar_nodes_obsah AS uzo ON uzo.treenode_ptr_id = uztn.first_child_id
INNER JOIN seminar_texty AS uzt ON uzo.text_id = uzt.id
-- Text vzoráku:
INNER JOIN seminar_nodes_uloha_zadani AS uvn ON id = uvn.uloha_id
INNER JOIN seminar_nodes_treenode AS uvtn ON uvtn.id = uvn.treenode_ptr_id
INNER JOIN seminar_nodes_obsah AS uvo ON uvo.treenode_ptr_id = uvtn.first_child_id
INNER JOIN seminar_texty AS uvt ON uvo.text_id = uvt.id
ORDER BY problem_ptr_id"""
same_fields = ['cislo_zadani', 'cislo_reseni', 'text_zadani', 'text_reseni']
renamed_fields = [
('id', 'problem_ptr_id'),
('body', 'max_body'),
]
old_fields = same_fields + [f[0] for f in renamed_fields]
new_fields = same_fields + [f[1] for f in renamed_fields]
old_res, new_res = execute_simple(old_query, new_query)
res = zip(old_res,new_res)
for o,n in res:
check_same(o,n, old_fields, new_fields)
# TODO: cislo_deadline
def check_tema():
raise NotImplementedError()
old_query = "SELECT * FROM seminar_problemy WHERE typ IN ('tema', 'serial') ORDER BY id"
new_query = """SELECT tema_typ, zad_text.na_web as text_zadani, res_text.na_web as text_reseni
FROM seminar_temata
-- Problém:
JOIN seminar_problemy AS problem ON problem_ptr_id = problem.id
-- Text:
-- TvCNode dva potomky, oba TextNode. První drží původní text zadání, druhý řešení.
INNER JOIN seminar_nodes_temavcisle as tvcn ON tvcn.tema_id = id
INNER JOIN seminar_nodes_treenode AS roottn ON tvcn.treenode_ptr_id = roottn.id
INNER JOIN seminar_nodes_treenode AS zad_tn ON roottn.first_child_id = zad_tn.id
INNER JOIN seminar_nodes_treenode AS res_tn ON zad_tn.succ_id = res_tn.id
INNER JOIN seminar_nodes_obsah AS zad_on ON zad_on.treenode_ptr_id = zad_tn.id
INNER JOIN seminar_nodes_obsah AS res_on ON res_on.treenode_ptr_id = res_tn.id
INNER JOIN seminar_texty as zad_text ON zad_on.text_id = zad_text.id
INNER JOIN seminar_texty as res_text ON res_on.text_id = res_text.id
ORDER BY problem_ptr_id"""
same_fields = ['text_zadani', 'text_reseni']
renamed_fields = [
('typ', 'tema_typ'),
]
old_fields = same_fields + [f[0] for f in renamed_fields]
new_fields = same_fields + [f[1] for f in renamed_fields]
old_res, new_res = execute_simple(old_query, new_query)
res = zip(old_res,new_res)
for o,n in res:
check_same(o,n, old_fields, new_fields)
#TODO: Tema.rocnik
def check_konfera():
old_query = "SELECT * FROM seminar_problemy WHERE typ = 'konfera'"
new_query = "SELECT * FROM seminar_konfera JOIN seminar_problemy as problem ON problem_ptr_id = problem.id"
@ -278,7 +373,20 @@ def check_org_clanek():
raise ValueError('There exists a Org-clanek!')
def check_res_clanek():
raise NotImplementedError()
old_query = "SELECT * FROM seminar_problemy WHERE typ = 'res-clanek' ORDER BY id"
new_query = "SELECT * FROM seminar_clanky JOIN seminar_problemy as problem ON problem_ptr_id = problem.id ORDER BY problem_ptr_id"
same_fields = []
renamed_fields = [
]
old_fields = same_fields + [f[0] for f in renamed_fields]
new_fields = same_fields + [f[1] for f in renamed_fields]
old_res, new_res = execute_simple(old_query, new_query)
res = zip(old_res,new_res)
for o,n in res:
check_same(o,n, old_fields, new_fields)
# TODO: Cislo
def check_untyped_problem():
old_query = "SELECT * FROM seminar_problemy WHERE typ NOT IN ('uloha', 'tema', 'serial', 'konfera', 'org-clanek', 'res-clanek')"
@ -304,6 +412,7 @@ check_nastaveni()
check_novinky()
check_pohadka()
check_problem_common()
check_uloha()
check_tema()
check_konfera()

Loading…
Cancel
Save