Web M&M
https://mam.matfyz.cz
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
458 lines
16 KiB
458 lines
16 KiB
#!/usr/bin/python3
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
OLD_DB = "mam_old"
|
|
NEW_DB = "mamweb"
|
|
|
|
oldconn = psycopg2.connect(f"dbname={OLD_DB}")
|
|
newconn = psycopg2.connect(f"dbname={NEW_DB}")
|
|
|
|
oldcur = oldconn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
newcur = newconn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
|
|
|
# Uses global variables oldcur, newcur!
|
|
def execute_simple(old_query, new_query=None):
|
|
if new_query is None:
|
|
new_query = old_query
|
|
|
|
oldcur.execute(old_query)
|
|
newcur.execute(new_query)
|
|
|
|
if oldcur.rowcount != newcur.rowcount:
|
|
raise ValueError(f"Queries '{old_query}' and '{new_query}' returned different number of rows ({oldcur.rowcount} and {newcur.rowcount})")
|
|
|
|
return(oldcur.fetchall(), newcur.fetchall())
|
|
|
|
def check_same(old_row, new_row, old_fields, new_fields=None):
|
|
if type(old_fields) != list:
|
|
old_fields = [old_fields]
|
|
|
|
if new_fields is None:
|
|
new_fields = old_fields
|
|
|
|
fields = zip(old_fields, new_fields)
|
|
|
|
for old_field, new_field in fields:
|
|
if old_row[old_field] == new_row[new_field]:
|
|
continue
|
|
raise ValueError(f"Fields '{old_field}'({old_row[old_field]}) and '{new_field}'({new_row[new_field]}) differs for rows \n'{old_row}' and \n'{new_row}'")
|
|
return True
|
|
|
|
def get_user_id_for_org_id(org_id):
|
|
query = """SELECT auth_user.id FROM auth_user
|
|
INNER JOIN seminar_osoby ON seminar_osoby.user_id = auth_user.id
|
|
INNER JOIN seminar_organizator ON seminar_organizator.osoba_id = seminar_osoby.id
|
|
WHERE seminar_organizator.id = %s """
|
|
|
|
newcur.execute(query,(org_id,))
|
|
return newcur.fetchone()['id']
|
|
|
|
|
|
|
|
|
|
def check_skola():
|
|
old_query = "SELECT * FROM seminar_skoly ORDER BY id"
|
|
|
|
old_res, new_res = execute_simple(old_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n,['id','aesop_id','izo','nazev','kratky_nazev','ulice','mesto','psc','stat','je_zs','je_ss','poznamka'])
|
|
|
|
def check_resitel():
|
|
old_query = 'SELECT * FROM seminar_resitele ORDER BY id'
|
|
new_query = '''SELECT seminar_resitele.id, skola_id, rok_maturity, zasilat, seminar_resitele.poznamka,
|
|
o.jmeno AS jmeno, o.prijmeni AS prijmeni, o.user_id AS user_id, o.pohlavi_muz AS pohlavi_muz, o.email AS email, o.telefon AS telefon, o.datum_narozeni AS datum_narozeni,
|
|
o.datum_souhlasu_udaje AS datum_souhlasu_udaje, o.datum_souhlasu_zasilani AS datum_souhlasu_zasilani, o.datum_registrace AS datum_prihlaseni, o.ulice AS ulice, o.mesto AS mesto, o.psc AS psc, o.stat AS stat
|
|
FROM seminar_resitele JOIN seminar_osoby AS o ON seminar_resitele.osoba_id = o.id ORDER BY seminar_resitele.id'''
|
|
|
|
old_res, new_res = execute_simple(old_query,new_query)
|
|
|
|
res = zip(old_res,new_res)
|
|
|
|
fields_osoba = [
|
|
'jmeno',
|
|
'prijmeni',
|
|
'user_id',
|
|
'pohlavi_muz',
|
|
#'email', #TODO: potřeba dořešit, protože merge řešitele a organizátora
|
|
'telefon',
|
|
'datum_narozeni',
|
|
'datum_souhlasu_udaje',
|
|
'datum_souhlasu_zasilani',
|
|
'datum_prihlaseni',
|
|
'ulice',
|
|
'mesto',
|
|
'psc',
|
|
'stat',
|
|
]
|
|
fields_keep = [
|
|
'id',
|
|
'skola_id',
|
|
'rok_maturity',
|
|
'zasilat',
|
|
'poznamka',
|
|
]
|
|
fields = fields_keep+fields_osoba
|
|
for o,n in res:
|
|
check_same(o,n,fields)
|
|
|
|
def check_reseni():
|
|
old_query = 'SELECT * FROM seminar_reseni ORDER BY id'
|
|
new_query = '''SELECT seminar_reseni.id, forma, poznamka, cas_doruceni, hodnoceni.problem_id AS h_problem_id, hodnoceni.body AS h_body, hodnoceni.cislo_body_id AS h_cislo_body_id
|
|
FROM seminar_reseni
|
|
JOIN seminar_hodnoceni AS hodnoceni ON seminar_reseni.id = hodnoceni.reseni_id
|
|
ORDER BY id'''
|
|
|
|
same_fields = ['id', 'forma', 'poznamka']
|
|
renamed_fields = [('timestamp', 'cas_doruceni'),
|
|
('problem_id', 'h_problem_id'),
|
|
('body', 'h_body'),
|
|
('cislo_body_id', 'h_cislo_body_id'),
|
|
]
|
|
old_fields = same_fields + [f[0] for f in renamed_fields]
|
|
new_fields = same_fields + [f[1] for f in renamed_fields]
|
|
|
|
old_res, new_res = execute_simple(old_query,new_query)
|
|
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n,old_fields, new_fields)
|
|
|
|
# Řešitelé jsou nově m2m, takže je musíme dohledat
|
|
old_query = 'SELECT id, resitel_id FROM seminar_reseni ORDER BY id'
|
|
new_query = 'SELECT reseni_id, resitele_id FROM seminar_reseni_resitele ORDER BY reseni_id'
|
|
|
|
#oldcur = oldconn.cursor()
|
|
oldcur.execute(old_query)
|
|
old_results = oldcur.fetchall()
|
|
#newcur = newconn.cursor()
|
|
newcur.execute(new_query)
|
|
new_results = newcur.fetchall()
|
|
|
|
for oldr in old_results:
|
|
if oldr not in new_results:
|
|
raise ValueError(f'Reseni pair {oldr} not found in new db.')
|
|
|
|
def check_organizator():
|
|
old_query = 'SELECT * FROM seminar_organizator ORDER BY id'
|
|
new_query = '''SELECT seminar_organizator.id AS id, studuje, strucny_popis_organizatora, users.id AS uid, osoba.prezdivka AS o_prezdivka, osoba.foto AS o_foto, organizuje_od, organizuje_do
|
|
FROM seminar_organizator
|
|
JOIN seminar_osoby AS osoba ON osoba_id = osoba.id
|
|
JOIN auth_user AS users ON osoba.user_id = users.id
|
|
ORDER BY seminar_organizator.id'''
|
|
|
|
same_fields = ['studuje', 'strucny_popis_organizatora']
|
|
renamed_fields = [
|
|
('user_id', 'uid'),
|
|
#('prezdivka', 'o_prezdivka'),
|
|
('foto', 'o_foto'),
|
|
]
|
|
old_fields = same_fields + [f[0] for f in renamed_fields]
|
|
new_fields = same_fields + [f[1] for f in renamed_fields]
|
|
|
|
old_res, new_res = execute_simple(old_query,new_query)
|
|
res = zip(old_res, new_res)
|
|
for o,n in res:
|
|
check_same(o,n,old_fields, new_fields)
|
|
# organizuje od, do:
|
|
print(list(o.keys()),list(n.keys()))
|
|
if o['organizuje_od_roku'] != n['organizuje_od'].year:
|
|
raise ValueError(f'Not matching organizuje_od for org id={o["id"]}: old {o["organizuje_od_roku"]}, new {n["organizuje_od"]}')
|
|
if o['organizuje_do_roku'] != n['organizuje_do'].year:
|
|
raise ValueError(f'Not matching organizuje_do for org id={o["id"]}: old {o["organizuje_do_roku"]}, new {n["organizuje_do"]}')
|
|
if o['prezdivka'] == n['o_prezdivka']:
|
|
continue
|
|
if o['prezdivka'] is None and n['o_prezdivka'] == '':
|
|
continue
|
|
print(o,n)
|
|
raise ValueError(f'Not matching prezdivka for org id={o["id"]}: old {o["prezdivka"]}, new {n["o_prezdivka"]}')
|
|
|
|
|
|
def check_rocnik():
|
|
old_query = "SELECT * FROM seminar_rocniky ORDER BY id"
|
|
|
|
old_res, new_res = execute_simple(old_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n,['id','prvni_rok', 'rocnik', 'exportovat'])
|
|
|
|
def check_cislo():
|
|
old_query = "SELECT * FROM seminar_cisla ORDER BY id"
|
|
|
|
old_res, new_res = execute_simple(old_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n, ['id','rocnik_id','cislo', 'datum_vydani','datum_deadline','verejne','poznamka','pdf'],
|
|
['id','rocnik_id','poradi','datum_vydani','datum_deadline','verejne','poznamka','pdf'])
|
|
|
|
def check_priloha_reseni():
|
|
old_query = "SELECT * FROM seminar_priloha_reseni"
|
|
|
|
old_res, new_res = execute_simple(old_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n, ['id','reseni_id', 'timestamp', 'soubor', 'poznamka'],
|
|
['id','reseni_id', 'vytvoreno', 'soubor', 'poznamka'])
|
|
|
|
def check_soustredeni():
|
|
old_query = "SELECT * FROM seminar_soustredeni ORDER BY id"
|
|
|
|
old_res, new_res = execute_simple(old_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n,['id','rocnik_id','datum_zacatku','datum_konce','verejne','misto','text','typ','exportovat'])
|
|
#Kontrola ucasnici, organizatori v samostatnych funkcich
|
|
|
|
def check_soustredeni_ucastnici():
|
|
old_query = "SELECT * FROM seminar_soustredeni_ucastnici ORDER BY id"
|
|
|
|
old_res, new_res = execute_simple(old_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n,['id','resitel_id','soustredeni_id','poznamka'])
|
|
|
|
def check_soustredeni_organizatori():
|
|
old_query = "SELECT * FROM seminar_soustredeni_organizatori ORDER BY id"
|
|
|
|
old_res, new_res = execute_simple(old_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n,['id','organizator_id','soustredeni_id','poznamka'])
|
|
|
|
def check_nastaveni():
|
|
old_query = "SELECT * FROM seminar_nastaveni ORDER BY id"
|
|
|
|
old_res, new_res = execute_simple(old_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n,['id','aktualni_cislo_id'])
|
|
|
|
def check_novinky():
|
|
old_query = "SELECT * FROM seminar_novinky ORDER BY id"
|
|
|
|
old_res, new_res = execute_simple(old_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n,['id','datum','text','obrazek','zverejneno'])
|
|
if get_user_id_for_org_id(n['autor_id']) != o['autor_id']:
|
|
raise ValueError("Nesedi autori u novinek")
|
|
|
|
def check_pohadka():
|
|
old_query = "SELECT * FROM seminar_pohadky ORDER BY id"
|
|
new_query = """SELECT sp.id AS id, sp.autor_id AS autor_id, sp.vytvoreno AS vytvoreno, snp.treenode_ptr_id AS treenode_ptr_id, st.na_web AS text
|
|
FROM seminar_pohadky AS sp
|
|
INNER JOIN seminar_nodes_pohadka AS snp ON sp.id = snp.pohadka_id
|
|
INNER JOIN seminar_nodes_treenode AS snt ON snt.id = snp.treenode_ptr_id
|
|
INNER JOIN seminar_nodes_obsah AS sno ON sno.treenode_ptr_id = snt.first_child_id
|
|
INNER JOIN seminar_texty AS st ON sno.text_id = st.id
|
|
|
|
ORDER BY sp.id"""
|
|
|
|
old_res, new_res = execute_simple(old_query,new_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n,['id','timestamp','text'],['id','vytvoreno','text'])
|
|
if o['autor_id'] is not None:
|
|
if get_user_id_for_org_id(n['autor_id']) != o['autor_id']:
|
|
raise ValueError("Nesedi autori u pohadky")
|
|
|
|
|
|
# Problémy jsou rozdělené podle typů:
|
|
def check_problem_common():
|
|
query = "SELECT * FROM seminar_problemy ORDER BY id"
|
|
|
|
same_fields = ['id', 'nazev', 'stav', 'autor', 'kod']
|
|
renamed_fields = [
|
|
('text_org', 'poznamka'),
|
|
('timestamp', 'vytvoreno'),
|
|
]
|
|
old_fields = same_fields + [f[0] for f in renamed_fields]
|
|
new_fields = same_fields + [f[1] for f in renamed_fields]
|
|
|
|
old_res, new_res = execute_simple(query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n, old_fields, new_fields)
|
|
|
|
# Opravovatelé
|
|
old_query = "SELECT id, opravovatel_id FROM seminar_problemy"
|
|
new_query = "SELECT problem_id, opravovatel_id FROM seminar_problemy_opravovatele"
|
|
|
|
# Simple cursors
|
|
#oldcur = oldconn.cursor()
|
|
old_results = oldcur.execute(old_query).fetchall()
|
|
#newcur = newconn.cursor()
|
|
new_results = newcur.execute(new_query).fetchall()
|
|
|
|
for oldr in old_results:
|
|
if oldr not in new_results:
|
|
raise ValueError(f'Opravovatel pair {oldr} not found in new db.')
|
|
|
|
# FIXME: Zaměření? Závislost je opačným směrem, přes content type a object id. Asi se to dobastlit dá, ale nevím, jestli to za to stojí.
|
|
|
|
|
|
def check_uloha():
|
|
old_query = "SELECT * FROM seminar_problemy WHERE typ = 'uloha' ORDER BY id"
|
|
new_query = """SELECT cislo_zadani, cislo_reseni, problem_ptr_id, max_body, uzt.na_web AS text_zadani, uvt.na_web AS text_reseni
|
|
FROM seminar_ulohy
|
|
-- Problém:
|
|
JOIN seminar_problemy AS problem ON problem_ptr_id = problem.id
|
|
-- Text zadání:
|
|
INNER JOIN seminar_nodes_uloha_zadani AS uzn ON id = uzn.uloha_id
|
|
INNER JOIN seminar_nodes_treenode AS uztn ON uztn.id = uzn.treenode_ptr_id
|
|
INNER JOIN seminar_nodes_obsah AS uzo ON uzo.treenode_ptr_id = uztn.first_child_id
|
|
INNER JOIN seminar_texty AS uzt ON uzo.text_id = uzt.id
|
|
-- Text vzoráku:
|
|
INNER JOIN seminar_nodes_uloha_zadani AS uvn ON id = uvn.uloha_id
|
|
INNER JOIN seminar_nodes_treenode AS uvtn ON uvtn.id = uvn.treenode_ptr_id
|
|
INNER JOIN seminar_nodes_obsah AS uvo ON uvo.treenode_ptr_id = uvtn.first_child_id
|
|
INNER JOIN seminar_texty AS uvt ON uvo.text_id = uvt.id
|
|
|
|
ORDER BY problem_ptr_id"""
|
|
|
|
same_fields = ['cislo_zadani', 'cislo_reseni', 'text_zadani', 'text_reseni']
|
|
renamed_fields = [
|
|
('id', 'problem_ptr_id'),
|
|
('body', 'max_body'),
|
|
]
|
|
old_fields = same_fields + [f[0] for f in renamed_fields]
|
|
new_fields = same_fields + [f[1] for f in renamed_fields]
|
|
|
|
old_res, new_res = execute_simple(old_query, new_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n, old_fields, new_fields)
|
|
# Datum deadline vypadá prázdně, tak to budeme předpokládat.
|
|
if n['datum_deadline'] is not None:
|
|
raise ValueError("Úloha má deadline.")
|
|
|
|
def check_tema():
|
|
old_query = """SELECT text_zadani, text_reseni, typ, c.rocnik_id AS rocnik_id
|
|
FROM seminar_problemy
|
|
INNER JOIN seminar_cisla AS c ON c.id = cislo_zadani_id
|
|
WHERE typ IN ('tema', 'serial')
|
|
ORDER BY id"""
|
|
new_query = """SELECT tema_typ, zad_text.na_web AS text_zadani, res_text.na_web AS text_reseni, rn.rocnik_id AS rocnik_id
|
|
FROM seminar_temata
|
|
-- Problém:
|
|
JOIN seminar_problemy AS problem ON problem_ptr_id = problem.id
|
|
-- Text:
|
|
-- TvCNode má dva potomky, oba TextNode. První drží původní text zadání, druhý řešení.
|
|
INNER JOIN seminar_nodes_temavcisle AS tvcn ON tvcn.tema_id = id
|
|
INNER JOIN seminar_nodes_treenode AS ttn ON tvcn.treenode_ptr_id = ttn.id
|
|
INNER JOIN seminar_nodes_treenode AS zad_tn ON ttn.first_child_id = zad_tn.id
|
|
INNER JOIN seminar_nodes_treenode AS res_tn ON zad_tn.succ_id = res_tn.id
|
|
INNER JOIN seminar_nodes_obsah AS zad_on ON zad_on.treenode_ptr_id = zad_tn.id
|
|
INNER JOIN seminar_nodes_obsah AS res_on ON res_on.treenode_ptr_id = res_tn.id
|
|
INNER JOIN seminar_texty AS zad_text ON zad_on.text_id = zad_text.id
|
|
INNER JOIN seminar_texty AS res_text ON res_on.text_id = res_text.id
|
|
-- Ročník tématu:
|
|
-- Podle rootu TvCN
|
|
INNER JOIN seminar_nodes_rocnik AS rn ON ttn.root_id = rn.treenode_ptr_id
|
|
|
|
ORDER BY problem_ptr_id"""
|
|
same_fields = ['text_zadani', 'text_reseni', 'rocnik_id']
|
|
renamed_fields = [
|
|
('typ', 'tema_typ'),
|
|
]
|
|
old_fields = same_fields + [f[0] for f in renamed_fields]
|
|
new_fields = same_fields + [f[1] for f in renamed_fields]
|
|
|
|
old_res, new_res = execute_simple(old_query, new_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n, old_fields, new_fields)
|
|
|
|
def check_konfera():
|
|
old_query = "SELECT * FROM seminar_problemy WHERE typ = 'konfera'"
|
|
new_query = "SELECT * FROM seminar_konfera JOIN seminar_problemy AS problem ON problem_ptr_id = problem.id"
|
|
|
|
oldcur.execute(old_query)
|
|
newcur.execute(new_query)
|
|
|
|
if oldcur.rowcount != 0 or newcur.rowcount != 0:
|
|
raise ValueError('There exists a Konfera!')
|
|
|
|
def check_org_clanek():
|
|
old_query = "SELECT * FROM seminar_problemy WHERE typ = 'org-clanek'"
|
|
|
|
oldcur.execute(old_query)
|
|
|
|
if oldcur.rowcount != 0:
|
|
raise ValueError('There exists a Org-clanek!')
|
|
|
|
def check_res_clanek():
|
|
# Dva(!) články mají text (zadání), který se má zachovat.
|
|
old_query = "SELECT * FROM seminar_problemy WHERE typ = 'res-clanek' ORDER BY id"
|
|
new_query = """SELECT cislo_id, text.na_web AS text_zadani
|
|
FROM seminar_clanky
|
|
JOIN seminar_problemy AS problem ON problem_ptr_id = problem.id
|
|
INNER JOIN seminar_hodnoceni AS hodn ON problem.id = hodn.problem_id
|
|
INNER JOIN seminar_reseni AS rese ON rese.id = hodn.reseni_id
|
|
INNER JOIN seminar_nodes_reseni AS resnode ON resnode.reseni_id = rese.id
|
|
INNER JOIN seminar_nodes_treenode AS tn ON resnode.treenode_ptr_id = tn.id
|
|
INNER JOIN seminar_nodes_obsah AS on ON on.treenode_ptr_id = tn.first_child_id
|
|
INNER JOIN seminar_texty AS text ON text.id = on.text_id
|
|
|
|
ORDER BY problem_ptr_id"""
|
|
same_fields = ['text_zadani']
|
|
renamed_fields = [
|
|
('cislo_zadani_id', 'cislo_id'),
|
|
]
|
|
old_fields = same_fields + [f[0] for f in renamed_fields]
|
|
new_fields = same_fields + [f[1] for f in renamed_fields]
|
|
|
|
old_res, new_res = execute_simple(old_query, new_query)
|
|
res = zip(old_res,new_res)
|
|
|
|
for o,n in res:
|
|
check_same(o,n, old_fields, new_fields)
|
|
|
|
def check_untyped_problem():
|
|
old_query = "SELECT * FROM seminar_problemy WHERE typ NOT IN ('uloha', 'tema', 'serial', 'konfera', 'org-clanek', 'res-clanek')"
|
|
|
|
oldcur.execute(old_query)
|
|
|
|
if oldcur.rowcount != 0:
|
|
raise ValueError('There exists a Problem without type!')
|
|
|
|
|
|
|
|
check_skola()
|
|
check_resitel()
|
|
check_reseni()
|
|
check_organizator()
|
|
check_rocnik()
|
|
check_cislo()
|
|
check_priloha_reseni()
|
|
check_soustredeni()
|
|
check_soustredeni_ucastnici()
|
|
check_soustredeni_organizatori()
|
|
check_nastaveni()
|
|
check_novinky()
|
|
check_pohadka()
|
|
|
|
check_problem_common()
|
|
check_uloha()
|
|
check_tema()
|
|
check_konfera()
|
|
check_org_clanek()
|
|
check_res_clanek()
|
|
check_untyped_problem()
|
|
|