#!/usr/bin/env python3
import psycopg2
import psycopg2 . extras
OLD_DB = " mam_old "
NEW_DB = " mamweb "
oldconn = psycopg2 . connect ( f " dbname= { OLD_DB } " )
newconn = psycopg2 . connect ( f " dbname= { NEW_DB } " )
oldcur = oldconn . cursor ( cursor_factory = psycopg2 . extras . DictCursor )
newcur = newconn . cursor ( cursor_factory = psycopg2 . extras . DictCursor )
# Uses global variables oldcur, newcur!
def execute_simple ( old_query , new_query = None ) :
if new_query is None :
new_query = old_query
oldcur . execute ( old_query )
newcur . execute ( new_query )
if oldcur . rowcount != newcur . rowcount :
raise ValueError ( f " Queries ' { old_query } ' and ' { new_query } ' returned different number of rows ( { oldcur . rowcount } and { newcur . rowcount } ) " )
return ( oldcur . fetchall ( ) , newcur . fetchall ( ) )
def check_same ( old_row , new_row , old_fields , new_fields = None ) :
if type ( old_fields ) != list :
old_fields = [ old_fields ]
if new_fields is None :
new_fields = old_fields
fields = zip ( old_fields , new_fields )
for old_field , new_field in fields :
if old_row [ old_field ] == new_row [ new_field ] :
continue
raise ValueError ( f " Fields ' { old_field } ' ( { old_row [ old_field ] } ) and ' { new_field } ' ( { new_row [ new_field ] } ) differs for rows \n ' { old_row } ' and \n ' { new_row } ' " )
return True
def get_user_id_for_org_id ( org_id ) :
query = """ SELECT auth_user.id FROM auth_user
INNER JOIN seminar_osoby ON seminar_osoby . user_id = auth_user . id
INNER JOIN seminar_organizator ON seminar_organizator . osoba_id = seminar_osoby . id
WHERE seminar_organizator . id = % s """
newcur . execute ( query , ( org_id , ) )
return newcur . fetchone ( ) [ ' id ' ]
def check_skola ( ) :
old_query = " SELECT * FROM seminar_skoly ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' aesop_id ' , ' izo ' , ' nazev ' , ' kratky_nazev ' , ' ulice ' , ' mesto ' , ' psc ' , ' stat ' , ' je_zs ' , ' je_ss ' , ' poznamka ' ] )
def check_resitel ( ) :
old_query = ' SELECT * FROM seminar_resitele ORDER BY id '
new_query = ''' SELECT seminar_resitele.id, skola_id, rok_maturity, zasilat, seminar_resitele.poznamka,
o . jmeno AS jmeno , o . prijmeni AS prijmeni , o . user_id AS user_id , o . pohlavi_muz AS pohlavi_muz , o . email AS email , o . telefon AS telefon , o . datum_narozeni AS datum_narozeni ,
o . datum_souhlasu_udaje AS datum_souhlasu_udaje , o . datum_souhlasu_zasilani AS datum_souhlasu_zasilani , o . datum_registrace AS datum_prihlaseni , o . ulice AS ulice , o . mesto AS mesto , o . psc AS psc , o . stat AS stat
FROM seminar_resitele JOIN seminar_osoby AS o ON seminar_resitele . osoba_id = o . id ORDER BY seminar_resitele . id '''
old_res , new_res = execute_simple ( old_query , new_query )
res = zip ( old_res , new_res )
fields_osoba = [
' jmeno ' ,
' prijmeni ' ,
' user_id ' ,
' pohlavi_muz ' ,
#'email', #TODO: potřeba dořešit, protože merge řešitele a organizátora
' telefon ' ,
' datum_narozeni ' ,
' datum_souhlasu_udaje ' ,
' datum_souhlasu_zasilani ' ,
' datum_prihlaseni ' ,
' ulice ' ,
' mesto ' ,
' psc ' ,
' stat ' ,
]
fields_keep = [
' id ' ,
' skola_id ' ,
' rok_maturity ' ,
' zasilat ' ,
' poznamka ' ,
]
fields = fields_keep + fields_osoba
for o , n in res :
check_same ( o , n , fields )
def check_reseni ( ) :
old_query = ' SELECT * FROM seminar_reseni ORDER BY id '
new_query = ''' SELECT seminar_reseni.id, forma, poznamka, cas_doruceni, hodnoceni.problem_id AS h_problem_id, hodnoceni.body AS h_body, hodnoceni.cislo_body_id AS h_cislo_body_id
FROM seminar_reseni
JOIN seminar_hodnoceni AS hodnoceni ON seminar_reseni . id = hodnoceni . reseni_id
ORDER BY id '''
same_fields = [ ' id ' , ' forma ' , ' poznamka ' ]
renamed_fields = [ ( ' timestamp ' , ' cas_doruceni ' ) ,
( ' problem_id ' , ' h_problem_id ' ) ,
( ' body ' , ' h_body ' ) ,
( ' cislo_body_id ' , ' h_cislo_body_id ' ) ,
]
old_fields = same_fields + [ f [ 0 ] for f in renamed_fields ]
new_fields = same_fields + [ f [ 1 ] for f in renamed_fields ]
old_res , new_res = execute_simple ( old_query , new_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , old_fields , new_fields )
# Řešitelé jsou nově m2m, takže je musíme dohledat
old_query = ' SELECT id, resitel_id FROM seminar_reseni ORDER BY id '
new_query = ' SELECT reseni_id, resitele_id FROM seminar_reseni_resitele ORDER BY reseni_id '
#oldcur = oldconn.cursor()
oldcur . execute ( old_query )
old_results = oldcur . fetchall ( )
#newcur = newconn.cursor()
newcur . execute ( new_query )
new_results = newcur . fetchall ( )
for oldr in old_results :
if oldr not in new_results :
raise ValueError ( f ' Reseni pair { oldr } not found in new db. ' )
def check_organizator ( ) :
old_query = ' SELECT * FROM seminar_organizator ORDER BY id '
new_query = ''' SELECT seminar_organizator.id AS id, studuje, strucny_popis_organizatora, users.id AS uid, osoba.prezdivka AS o_prezdivka, osoba.foto AS o_foto, organizuje_od, organizuje_do
FROM seminar_organizator
JOIN seminar_osoby AS osoba ON osoba_id = osoba . id
JOIN auth_user AS users ON osoba . user_id = users . id
ORDER BY seminar_organizator . id '''
same_fields = [ ' studuje ' , ' strucny_popis_organizatora ' ]
renamed_fields = [
( ' user_id ' , ' uid ' ) ,
#('prezdivka', 'o_prezdivka'),
( ' foto ' , ' o_foto ' ) ,
]
old_fields = same_fields + [ f [ 0 ] for f in renamed_fields ]
new_fields = same_fields + [ f [ 1 ] for f in renamed_fields ]
old_res , new_res = execute_simple ( old_query , new_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , old_fields , new_fields )
# organizuje od, do:
if o [ ' organizuje_do_roku ' ] is None and n [ ' organizuje_do ' ] is None :
pass
elif o [ ' organizuje_od_roku ' ] != n [ ' organizuje_od ' ] . year :
raise ValueError ( f ' Not matching organizuje_od for org id= { o [ " id " ] } : old { o [ " organizuje_od_roku " ] } , new { n [ " organizuje_od " ] } ' )
if o [ ' organizuje_do_roku ' ] is None and n [ ' organizuje_do ' ] is None :
pass
elif o [ ' organizuje_do_roku ' ] != n [ ' organizuje_do ' ] . year :
raise ValueError ( f ' Not matching organizuje_do for org id= { o [ " id " ] } : old { o [ " organizuje_do_roku " ] } , new { n [ " organizuje_do " ] } ' )
if o [ ' prezdivka ' ] == n [ ' o_prezdivka ' ] :
continue
if o [ ' prezdivka ' ] is None and n [ ' o_prezdivka ' ] == ' ' :
continue
raise ValueError ( f ' Not matching prezdivka for org id= { o [ " id " ] } : old { o [ " prezdivka " ] } , new { n [ " o_prezdivka " ] } ' )
def check_rocnik ( ) :
old_query = " SELECT * FROM seminar_rocniky ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' prvni_rok ' , ' rocnik ' , ' exportovat ' ] )
def check_cislo ( ) :
old_query = " SELECT * FROM seminar_cisla ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' rocnik_id ' , ' cislo ' , ' datum_vydani ' , ' datum_deadline ' , ' verejne ' , ' poznamka ' , ' pdf ' ] ,
[ ' id ' , ' rocnik_id ' , ' poradi ' , ' datum_vydani ' , ' datum_deadline ' , ' verejne ' , ' poznamka ' , ' pdf ' ] )
def check_priloha_reseni ( ) :
old_query = " SELECT * FROM seminar_priloha_reseni "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' reseni_id ' , ' timestamp ' , ' soubor ' , ' poznamka ' ] ,
[ ' id ' , ' reseni_id ' , ' vytvoreno ' , ' soubor ' , ' poznamka ' ] )
def check_soustredeni ( ) :
old_query = " SELECT * FROM seminar_soustredeni ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' rocnik_id ' , ' datum_zacatku ' , ' datum_konce ' , ' verejne ' , ' misto ' , ' text ' , ' typ ' , ' exportovat ' ] )
#Kontrola ucasnici, organizatori v samostatnych funkcich
def check_soustredeni_ucastnici ( ) :
old_query = " SELECT * FROM seminar_soustredeni_ucastnici ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' resitel_id ' , ' soustredeni_id ' , ' poznamka ' ] )
def check_soustredeni_organizatori ( ) :
old_query = " SELECT * FROM seminar_soustredeni_organizatori ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' organizator_id ' , ' soustredeni_id ' , ' poznamka ' ] )
def check_nastaveni ( ) :
old_query = " SELECT * FROM seminar_nastaveni ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' aktualni_cislo_id ' ] )
def check_novinky ( ) :
old_query = " SELECT * FROM seminar_novinky ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' datum ' , ' text ' , ' obrazek ' , ' zverejneno ' ] )
if get_user_id_for_org_id ( n [ ' autor_id ' ] ) != o [ ' autor_id ' ] :
raise ValueError ( " Nesedi autori u novinek " )
def check_pohadka ( ) :
old_query = " SELECT * FROM seminar_pohadky ORDER BY id "
new_query = """ SELECT sp.id AS id, sp.autor_id AS autor_id, sp.vytvoreno AS vytvoreno, snp.treenode_ptr_id AS treenode_ptr_id, st.na_web AS text,
zn_pred . uloha_id AS uloha_pred , zn_po . uloha_id AS uloha_po
FROM seminar_pohadky AS sp
- - Text pohádky
INNER JOIN seminar_nodes_pohadka AS snp ON sp . id = snp . pohadka_id
INNER JOIN seminar_nodes_treenode AS snt ON snt . id = snp . treenode_ptr_id
INNER JOIN seminar_nodes_obsah AS sno ON sno . treenode_ptr_id = snt . first_child_id
INNER JOIN seminar_texty AS st ON sno . text_id = st . id
- - Predchozí úloha
LEFT OUTER JOIN seminar_nodes_treenode AS ztn_pred ON ztn_pred . succ_id = snt . id
LEFT OUTER JOIN seminar_nodes_uloha_zadani AS zn_pred ON zn_pred . treenode_ptr_id = ztn_pred . id
- - Následující úloha
LEFT OUTER JOIN seminar_nodes_uloha_zadani AS zn_po ON zn_po . treenode_ptr_id = snt . succ_id
ORDER BY sp . id """
old_res , new_res = execute_simple ( old_query , new_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' timestamp ' , ' text ' ] , [ ' id ' , ' vytvoreno ' , ' text ' ] )
if o [ ' autor_id ' ] is not None :
if get_user_id_for_org_id ( n [ ' autor_id ' ] ) != o [ ' autor_id ' ] :
raise ValueError ( " Nesedi autori u pohadky " )
# Správné úlohy
spravny_klic = ' uloha_pred ' if o [ ' pred ' ] else ' uloha_po '
if o [ ' uloha_id ' ] != n [ spravny_klic ] :
raise ValueError ( f " Pohádka přidružená ke špatné úloze! old: { o [ ' uloha_id ' ] } , new: { n [ spravny_klic ] } , pozice: { spravny_klic } " )
# Problémy jsou rozdělené podle typů:
def check_problem_common ( ) :
query = " SELECT * FROM seminar_problemy ORDER BY id "
same_fields = [ ' id ' , ' nazev ' , ' stav ' , ' autor_id ' , ' kod ' ]
renamed_fields = [
( ' text_org ' , ' poznamka ' ) ,
( ' timestamp ' , ' vytvoreno ' ) ,
]
old_fields = same_fields + [ f [ 0 ] for f in renamed_fields ]
new_fields = same_fields + [ f [ 1 ] for f in renamed_fields ]
old_res , new_res = execute_simple ( query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , old_fields , new_fields )
# Opravovatelé
old_query = " SELECT id, opravovatel_id FROM seminar_problemy "
new_query = " SELECT problem_id, opravovatel_id FROM seminar_problemy_opravovatele "
# Simple cursors
#oldcur = oldconn.cursor()
old_results = oldcur . execute ( old_query ) . fetchall ( )
#newcur = newconn.cursor()
new_results = newcur . execute ( new_query ) . fetchall ( )
for oldr in old_results :
if oldr not in new_results :
raise ValueError ( f ' Opravovatel pair { oldr } not found in new db. ' )
# Zaměření se vyřeší okometricky (#1186)
def check_uloha ( ) :
old_query = " SELECT * FROM seminar_problemy WHERE typ = ' uloha ' ORDER BY id "
new_query = """ SELECT cislo_zadani_id, cislo_reseni_id, problem_ptr_id, max_body, uzt.na_web AS text_zadani, uvt.na_web AS text_reseni
FROM seminar_ulohy
- - Problém :
JOIN seminar_problemy AS problem ON problem_ptr_id = problem . id
- - Text zadání :
INNER JOIN seminar_nodes_uloha_zadani AS uzn ON problem . id = uzn . uloha_id
INNER JOIN seminar_nodes_treenode AS uztn ON uztn . id = uzn . treenode_ptr_id
INNER JOIN seminar_nodes_obsah AS uzo ON uzo . treenode_ptr_id = uztn . first_child_id
INNER JOIN seminar_texty AS uzt ON uzo . text_id = uzt . id
- - Text vzoráku :
INNER JOIN seminar_nodes_uloha_zadani AS uvn ON problem . id = uvn . uloha_id
INNER JOIN seminar_nodes_treenode AS uvtn ON uvtn . id = uvn . treenode_ptr_id
INNER JOIN seminar_nodes_obsah AS uvo ON uvo . treenode_ptr_id = uvtn . first_child_id
INNER JOIN seminar_texty AS uvt ON uvo . text_id = uvt . id
ORDER BY problem_ptr_id """
same_fields = [ ' cislo_zadani_id ' , ' cislo_reseni_id ' , ' text_zadani ' , ' text_reseni ' ]
renamed_fields = [
( ' id ' , ' problem_ptr_id ' ) ,
( ' body ' , ' max_body ' ) ,
]
old_fields = same_fields + [ f [ 0 ] for f in renamed_fields ]
new_fields = same_fields + [ f [ 1 ] for f in renamed_fields ]
old_res , new_res = execute_simple ( old_query , new_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , old_fields , new_fields )
# Datum deadline vypadá prázdně, tak to budeme předpokládat.
if n [ ' datum_deadline ' ] is not None :
raise ValueError ( " Úloha má deadline. " )
def check_tema ( ) :
old_query = """ SELECT text_zadani, text_reseni, typ, c.rocnik_id AS rocnik_id
FROM seminar_problemy
INNER JOIN seminar_cisla AS c ON c . id = cislo_zadani_id
WHERE typ IN ( ' tema ' , ' serial ' )
ORDER BY seminar_problemy . id """
new_query = """ SELECT tema_typ, zad_text.na_web AS text_zadani, res_text.na_web AS text_reseni, rn.rocnik_id AS rocnik_id
FROM seminar_temata
- - Problém :
JOIN seminar_problemy AS problem ON problem_ptr_id = problem . id
- - Text :
- - TvCNode má dva potomky , oba TextNode . První drží původní text zadání , druhý řešení .
INNER JOIN seminar_nodes_temavcisle AS tvcn ON tvcn . tema_id = id
INNER JOIN seminar_nodes_treenode AS ttn ON tvcn . treenode_ptr_id = ttn . id
INNER JOIN seminar_nodes_treenode AS zad_tn ON ttn . first_child_id = zad_tn . id
INNER JOIN seminar_nodes_treenode AS res_tn ON zad_tn . succ_id = res_tn . id
INNER JOIN seminar_nodes_obsah AS zad_on ON zad_on . treenode_ptr_id = zad_tn . id
INNER JOIN seminar_nodes_obsah AS res_on ON res_on . treenode_ptr_id = res_tn . id
INNER JOIN seminar_texty AS zad_text ON zad_on . text_id = zad_text . id
INNER JOIN seminar_texty AS res_text ON res_on . text_id = res_text . id
- - Ročník tématu :
- - Podle rootu TvCN
INNER JOIN seminar_nodes_rocnik AS rn ON ttn . root_id = rn . treenode_ptr_id
ORDER BY problem_ptr_id """
same_fields = [ ' text_zadani ' , ' text_reseni ' , ' rocnik_id ' ]
renamed_fields = [
( ' typ ' , ' tema_typ ' ) ,
]
old_fields = same_fields + [ f [ 0 ] for f in renamed_fields ]
new_fields = same_fields + [ f [ 1 ] for f in renamed_fields ]
old_res , new_res = execute_simple ( old_query , new_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , old_fields , new_fields )
def check_konfera ( ) :
old_query = " SELECT * FROM seminar_problemy WHERE typ = ' konfera ' "
new_query = " SELECT * FROM seminar_konfera AS k JOIN seminar_problemy AS problem ON k.problem_ptr_id = problem.id "
oldcur . execute ( old_query )
newcur . execute ( new_query )
if oldcur . rowcount != 0 or newcur . rowcount != 0 :
raise ValueError ( ' There exists a Konfera! ' )
def check_org_clanek ( ) :
old_query = " SELECT * FROM seminar_problemy WHERE typ = ' org-clanek ' "
oldcur . execute ( old_query )
if oldcur . rowcount != 0 :
raise ValueError ( ' There exists a Org-clanek! ' )
def check_res_clanek ( ) :
# Dva(!) články mají text (zadání), který se má zachovat.
old_query = " SELECT * FROM seminar_problemy WHERE typ = ' res-clanek ' ORDER BY id "
new_query = """ SELECT cislo_id, text.na_web AS text_zadani
FROM seminar_clanky
JOIN seminar_problemy AS problem ON problem_ptr_id = problem . id
INNER JOIN seminar_hodnoceni AS hodn ON problem . id = hodn . problem_id
INNER JOIN seminar_reseni AS rese ON rese . id = hodn . reseni_id
INNER JOIN seminar_nodes_otistene_reseni AS resnode ON resnode . reseni_id = rese . id
INNER JOIN seminar_nodes_treenode AS tn ON resnode . treenode_ptr_id = tn . id
INNER JOIN seminar_nodes_obsah AS son ON son . treenode_ptr_id = tn . first_child_id
INNER JOIN seminar_texty AS text ON text . id = son . text_id
ORDER BY problem_ptr_id """
same_fields = [ ' text_zadani ' ]
renamed_fields = [
( ' cislo_zadani_id ' , ' cislo_id ' ) ,
]
old_fields = same_fields + [ f [ 0 ] for f in renamed_fields ]
new_fields = same_fields + [ f [ 1 ] for f in renamed_fields ]
old_res , new_res = execute_simple ( old_query , new_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , old_fields , new_fields )
def check_untyped_problem ( ) :
old_query = " SELECT * FROM seminar_problemy WHERE typ NOT IN ( ' uloha ' , ' tema ' , ' serial ' , ' konfera ' , ' org-clanek ' , ' res-clanek ' ) "
oldcur . execute ( old_query )
if oldcur . rowcount != 0 :
raise ValueError ( ' There exists a Problem without type! ' )
check_skola ( )
check_resitel ( )
check_reseni ( )
check_organizator ( )
check_rocnik ( )
check_cislo ( )
check_priloha_reseni ( )
check_soustredeni ( )
check_soustredeni_ucastnici ( )
check_soustredeni_organizatori ( )
check_nastaveni ( )
check_novinky ( )
check_pohadka ( )
check_problem_common ( )
check_uloha ( )
check_tema ( )
check_konfera ( )
check_org_clanek ( )
check_res_clanek ( )
check_untyped_problem ( )