#!/usr/bin/python3
import psycopg2
import psycopg2 . extras
OLD_DB = " mam_old "
NEW_DB = " mamweb "
oldconn = psycopg2 . connect ( f " dbname= { OLD_DB } " )
newconn = psycopg2 . connect ( f " dbname= { NEW_DB } " )
oldcur = oldconn . cursor ( cursor_factory = psycopg2 . extras . DictCursor )
newcur = newconn . cursor ( cursor_factory = psycopg2 . extras . DictCursor )
# Uses global variables oldcur, newcur!
def execute_simple ( old_query , new_query = None ) :
if new_query is None :
new_query = old_query
oldcur . execute ( old_query )
newcur . execute ( new_query )
if oldcur . rowcount != newcur . rowcount :
raise ValueError ( f " Queries ' { old_query } ' and ' { new_query } ' returned different number of rows ( { oldcur . rowcount } and { newcur . rowcount } ) " )
return ( oldcur . fetchall ( ) , newcur . fetchall ( ) )
def check_same ( old_row , new_row , old_fields , new_fields = None ) :
if type ( old_fields ) != list :
old_fields = [ old_fields ]
if new_fields is None :
new_fields = old_fields
fields = zip ( old_fields , new_fields )
for old_field , new_field in fields :
if old_row [ old_field ] == new_row [ new_field ] :
continue
raise ValueError ( f " Fields ' { old_field } ' and ' { new_field } ' differs for rows ' { old_row } ' and ' { new_row } ' " )
return True
def get_user_id_for_org_id ( org_id ) :
query = """ SELECT auth_user.id FROM auth_user
INNER JOIN seminar_osoby ON seminar_osoby . user_id = auth_user . id
INNER JOIN seminar_organizator ON seminar_organizator . osoba_id = seminar_osoby . id
WHERE seminar_organizator . id = % s """
newcur . execute ( query , ( org_id , ) )
return newcur . fetchone ( ) [ ' id ' ]
def check_skola ( ) :
old_query = " SELECT * FROM seminar_skoly ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' aesop_id ' , ' izo ' , ' nazev ' , ' kratky_nazev ' , ' ulice ' , ' mesto ' , ' psc ' , ' stat ' , ' je_zs ' , ' je_ss ' , ' poznamka ' ] )
def check_resitel ( ) :
old_query = ' SELECT * FROM seminar_resitele ORDER BY id '
new_query = ' SELECT * FROM seminar_resitele JOIN seminar_osoby ON seminar_resitele.osoba_id = seminar_osoby.id ORDER BY seminar_resitele.id '
old_res , new_res = execute_simple ( old_query , new_query )
res = zip ( old_res , new_res )
fields_osoba = [
' jmeno ' ,
' prijmeni ' ,
' user ' ,
' pohlavi_muz ' ,
' email ' ,
' telefon ' ,
' datum_narozeni ' ,
' datum_souhlasu_udaje ' ,
' datum_souhlasu_zasilani ' ,
' datum_prihlaseni ' ,
' ulice ' ,
' mesto ' ,
' psc ' ,
' stat ' ,
]
fields_keep = [
' id ' ,
' skola_id ' ,
' rok_maturity ' ,
' zasilat ' ,
' poznamka ' ,
]
fields_old = fields_keep + fields_osoba
fields_new = fields_keep + [ ' seminar_osoby. ' + f for f in fields_osoba ]
for o , n in res :
check_same ( o , n , fields_old , fields_new )
def check_reseni ( ) :
old_query = ' SELECT * FROM seminar_reseni ORDER BY id '
new_query = ' SELECT * FROM seminar_reseni JOIN seminar_hodnoceni AS hodnoceni ON hodnoceni_id = hodnoceni.id ORDER BY id '
same_fields = [ ' id ' , ' forma ' , ' poznamka ' ]
renamed_fields = [ ( ' timestamp ' , ' cas_doruceni ' ) ,
# Also moved fields
( ' problem_id ' , ' hodnoceni.problem_id ' ) ,
( ' body ' , ' hodnoceni.body ' ) ,
( ' cislo_body_id ' , ' hodnoceni.cislo_body_id ' ) ,
]
old_fields = same_fields + [ f [ 0 ] for f in renamed_fields ]
new_fields = same_fields + [ f [ 1 ] for f in renamed_fields ]
old_res , new_res = execute_simple ( old_query , new_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , old_fields , new_fields )
# Řešitelé jsou nově m2m, takže je musíme dohledat
old_query = ' SELECT id, resitel_id FROM seminar_reseni ORDER BY id '
new_query = ' SELECT reseni_id, resitel_id FROM seminar_reseni_resitele ORDER BY reseni_id '
oldcur = oldconn . cursor ( )
old_results = oldcur . execute ( old_query ) . fetchall ( )
newcur = newconn . cursor ( )
new_results = newcur . execute ( old_query ) . fetchall ( )
for oldr in old_results :
if oldr not in new_results :
raise ValueError ( f ' Pair { oldr } not found in new db. ' )
def check_organizator ( ) :
old_query = ' SELECT * FROM seminar_organizatori ORDER BY id '
new_query = ' SELECT * FROM seminar_organizatori JOIN seminar_osoby AS osoba ON osoba_id = osoba.id JOIN auth_user AS user ON osoba.user_id = user.id ORDER BY id '
same_fields = [ ' studuje ' , ' strucny_popis_organizatora ' ]
renamed_fields = [
( ' user_id ' , ' user.id ' ) ,
( ' prezdivka ' , ' osoba.prezdivka ' ) ,
( ' foto ' , ' osoba.foto ' ) ,
( ' foto_male ' , ' osoba.foto_male ' ) ,
]
old_fields = same_fields + [ f [ 0 ] for f in renamed_fields ]
new_fields = same_fields + [ f [ 1 ] for f in renamed_fields ]
old_res , new_res = execute_simple ( old_query , new_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , old_fields , new_fields )
# organizuje od, do:
if o . organizuje_od_roku != n . organizuje_od . year :
raise ValueError ( f ' Not matching organizuje_od for org id= { o . id } : old { o . organizuje_od_roku } , new { n . organizuje_od } ' )
if o . organizuje_do_roku != n . organizuje_do . year :
raise ValueError ( f ' Not matching organizuje_do for org id= { o . id } : old { o . organizuje_do_roku } , new { n . organizuje_do } ' )
def check_rocnik ( ) :
old_query = " SELECT * FROM seminar_rocniky ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' prvni_rok ' , ' rocnik ' , ' exportovat ' ] )
def check_cislo ( ) :
old_query = " SELECT * FROM seminar_cisla ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' rocnik_id ' , ' cislo ' , ' datum_vydani ' , ' datum_deadline ' , ' verejne ' , ' poznamka ' , ' pdf ' ] ,
[ ' id ' , ' rocnik_id ' , ' poradi ' , ' datum_vydani ' , ' datum_deadline ' , ' verejne ' , ' poznamka ' , ' pdf ' ] )
def check_priloha_reseni ( ) :
old_query = " SELECT * FROM seminar_priloha_reseni "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' reseni_id ' , ' timestamp ' , ' soubor ' , ' poznamka ' ] ,
[ ' id ' , ' reseni_id ' , ' vytvoreno ' , ' soubor ' , ' poznamka ' ] )
def check_soustredeni ( ) :
old_query = " SELECT * FROM seminar_soustredeni ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' rocnik_id ' , ' datum_zacatku ' , ' datum_konce ' , ' verejne ' , ' misto ' , ' text ' , ' typ ' , ' exportovat ' ] )
#Kontrola ucasnici, organizatori v samostatnych funkcich
def check_soustredeni_ucastnici ( ) :
old_query = " SELECT * FROM seminar_soustredeni_ucastnici ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' resitel_id ' , ' soustredeni_id ' , ' poznamka ' ] )
def check_soustredeni_organizatori ( ) :
old_query = " SELECT * FROM seminar_soustredeni_organizatori ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' organizator_id ' , ' soustredeni_id ' , ' poznamka ' ] )
def check_nastaveni ( ) :
old_query = " SELECT * FROM seminar_nastaveni ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' aktualni_cislo_id ' ] )
def check_novinky ( ) :
old_query = " SELECT * FROM seminar_novinky ORDER BY id "
old_res , new_res = execute_simple ( old_query )
res = zip ( old_res , new_res )
for o , n in res :
check_same ( o , n , [ ' id ' , ' datum ' , ' text ' , ' obrazek ' , ' zverejneno ' ] )
if get_user_id_for_org_id ( n [ ' autor_id ' ] ) != o [ ' autor_id ' ] :
raise ValueError ( " Nesedi autori u novinek " )
# Problémy jsou rozdělené podle typů:
def check_uloha ( ) :
raise NotImplementedError ( )
def check_tema ( ) :
raise NotImplementedError ( )
def check_konfera ( ) :
old_query = " SELECT * FROM seminar_problemy WHERE typ = ' konfera ' "
new_query = " SELECT * FROM seminar_konfera JOIN seminar_problemy as problem ON problem_ptr_id = problem.id "
oldcur . execute ( old_query )
newcur . execute ( new_query )
if oldcur . rowcount != 0 or newcur . rowcount != 0 :
raise ValueError ( ' There exists a Konfera! ' )
def check_org_clanek ( ) :
old_query = " SELECT * FROM seminar_problemy WHERE typ = ' org-clanek ' "
oldcur . execute ( old_query )
if oldcur . rowcount != 0 :
raise ValueError ( ' There exists a Org-clanek! ' )
def check_res_clanek ( ) :
raise NotImplementedError ( )
def check_untyped_problem ( ) :
old_query = " SELECT * FROM seminar_problemy WHERE typ NOT IN ( ' uloha ' , ' tema ' , ' serial ' , ' konfera ' , ' org-clanek ' , ' res-clanek ' ) "
oldcur . execute ( old_query )
if oldcur . rowcount != 0 :
raise ValueError ( ' There exists a Problem without type! ' )
check_skola ( )
check_resitel ( )
check_reseni ( )
check_organizator ( )
check_rocnik ( )
check_cislo ( )
check_priloha_reseni ( )
check_soustredeni ( )
check_soustredeni_ucastnici ( )
check_soustredeni_organizatori ( )
check_nastaveni ( )
check_novinky ( )
check_uloha ( )
check_tema ( )
check_konfera ( )
check_org_clanek ( )
check_res_clanek ( )
check_untyped_problem ( )