#!/usr/bin/python3 import psycopg2 import psycopg2.extras OLD_DB = "mam_old" NEW_DB = "mamweb" oldconn = psycopg2.connect(f"dbname={OLD_DB}") newconn = psycopg2.connect(f"dbname={NEW_DB}") oldcur = oldconn.cursor(cursor_factory=psycopg2.extras.DictCursor) newcur = newconn.cursor(cursor_factory=psycopg2.extras.DictCursor) # Uses global variables oldcur, newcur! def execute_simple(old_query, new_query=None): if new_query is None: new_query = old_query oldcur.execute(old_query) newcur.execute(new_query) if oldcur.rowcount != newcur.rowcount: raise ValueError(f"Queries '{old_query}' and '{new_query}' returned different number of rows ({oldcur.rowcount} and {newcur.rowcount})") return(oldcur.fetchall(), newcur.fetchall()) def check_same(old_row, new_row, old_fields, new_fields=None): if type(old_fields) != list: old_fields = [old_fields] if new_fields is None: new_fields = old_fields fields = zip(old_fields, new_fields) for old_field, new_field in fields: if old_row[old_field] == new_row[new_field]: continue raise ValueError(f"Fields '{old_field}' and '{new_field}' differs for rows '{old_row}' and '{new_row}'") return True def get_user_id_for_org_id(org_id): query = """SELECT auth_user.id FROM auth_user INNER JOIN seminar_osoby ON seminar_osoby.user_id = auth_user.id INNER JOIN seminar_organizator ON seminar_organizator.osoba_id = seminar_osoby.id WHERE seminar_organizator.id = %s """ newcur.execute(query,(org_id,)) return newcur.fetchone()['id'] def check_skola(): old_query = "SELECT * FROM seminar_skoly ORDER BY id" old_res, new_res = execute_simple(old_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,['id','aesop_id','izo','nazev','kratky_nazev','ulice','mesto','psc','stat','je_zs','je_ss','poznamka']) def check_resitel(): old_query = 'SELECT * FROM seminar_resitele ORDER BY id' new_query = 'SELECT * FROM seminar_resitele JOIN seminar_osoby ON seminar_resitele.osoba_id = seminar_osoby.id ORDER BY seminar_resitele.id' old_res, new_res = execute_simple(old_query,new_query) res = zip(old_res,new_res) fields_osoba = [ 'jmeno', 'prijmeni', 'user', 'pohlavi_muz', 'email', 'telefon', 'datum_narozeni', 'datum_souhlasu_udaje', 'datum_souhlasu_zasilani', 'datum_prihlaseni', 'ulice', 'mesto', 'psc', 'stat', ] fields_keep = [ 'id', 'skola_id', 'rok_maturity', 'zasilat', 'poznamka', ] fields_old = fields_keep+fields_osoba fields_new = fields_keep + ['seminar_osoby.'+f for f in fields_osoba] for o,n in res: check_same(o,n,fields_old, fields_new) def check_reseni(): old_query = 'SELECT * FROM seminar_reseni ORDER BY id' new_query = 'SELECT * FROM seminar_reseni JOIN seminar_hodnoceni AS hodnoceni ON hodnoceni_id = hodnoceni.id ORDER BY id' same_fields = ['id', 'forma', 'poznamka'] renamed_fields = [('timestamp', 'cas_doruceni'), # Also moved fields ('problem_id', 'hodnoceni.problem_id'), ('body', 'hodnoceni.body'), ('cislo_body_id', 'hodnoceni.cislo_body_id'), ] old_fields = same_fields + [f[0] for f in renamed_fields] new_fields = same_fields + [f[1] for f in renamed_fields] old_res, new_res = execute_simple(old_query,new_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,old_fields, new_fields) # Řešitelé jsou nově m2m, takže je musíme dohledat old_query = 'SELECT id, resitel_id FROM seminar_reseni ORDER BY id' new_query = 'SELECT reseni_id, resitel_id FROM seminar_reseni_resitele ORDER BY reseni_id' oldcur = oldconn.cursor() old_results = oldcur.execute(old_query).fetchall() newcur = newconn.cursor() new_results = newcur.execute(old_query).fetchall() for oldr in old_results: if oldr not in new_results: raise ValueError(f'Pair {oldr} not found in new db.') def check_organizator(): old_query = 'SELECT * FROM seminar_organizatori ORDER BY id' new_query = 'SELECT * FROM seminar_organizatori JOIN seminar_osoby AS osoba ON osoba_id = osoba.id JOIN auth_user AS user ON osoba.user_id = user.id ORDER BY id' same_fields = ['studuje', 'strucny_popis_organizatora'] renamed_fields = [ ('user_id', 'user.id'), ('prezdivka', 'osoba.prezdivka'), ('foto', 'osoba.foto'), ('foto_male', 'osoba.foto_male'), ] old_fields = same_fields + [f[0] for f in renamed_fields] new_fields = same_fields + [f[1] for f in renamed_fields] old_res, new_res = execute_simple(old_query,new_query) res = zip(old_res, new_res) for o,n in res: check_same(o,n,old_fields, new_fields) # organizuje od, do: if o.organizuje_od_roku != n.organizuje_od.year: raise ValueError(f'Not matching organizuje_od for org id={o.id}: old {o.organizuje_od_roku}, new {n.organizuje_od}') if o.organizuje_do_roku != n.organizuje_do.year: raise ValueError(f'Not matching organizuje_do for org id={o.id}: old {o.organizuje_do_roku}, new {n.organizuje_do}') def check_rocnik(): old_query = "SELECT * FROM seminar_rocniky ORDER BY id" old_res, new_res = execute_simple(old_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,['id','prvni_rok', 'rocnik', 'exportovat']) def check_cislo(): old_query = "SELECT * FROM seminar_cisla ORDER BY id" old_res, new_res = execute_simple(old_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n, ['id','rocnik_id','cislo', 'datum_vydani','datum_deadline','verejne','poznamka','pdf'], ['id','rocnik_id','poradi','datum_vydani','datum_deadline','verejne','poznamka','pdf']) def check_priloha_reseni(): old_query = "SELECT * FROM seminar_priloha_reseni" old_res, new_res = execute_simple(old_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n, ['id','reseni_id', 'timestamp', 'soubor', 'poznamka'], ['id','reseni_id', 'vytvoreno', 'soubor', 'poznamka']) def check_soustredeni(): old_query = "SELECT * FROM seminar_soustredeni ORDER BY id" old_res, new_res = execute_simple(old_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,['id','rocnik_id','datum_zacatku','datum_konce','verejne','misto','text','typ','exportovat']) #Kontrola ucasnici, organizatori v samostatnych funkcich def check_soustredeni_ucastnici(): old_query = "SELECT * FROM seminar_soustredeni_ucastnici ORDER BY id" old_res, new_res = execute_simple(old_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,['id','resitel_id','soustredeni_id','poznamka']) def check_soustredeni_organizatori(): old_query = "SELECT * FROM seminar_soustredeni_organizatori ORDER BY id" old_res, new_res = execute_simple(old_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,['id','organizator_id','soustredeni_id','poznamka']) def check_nastaveni(): old_query = "SELECT * FROM seminar_nastaveni ORDER BY id" old_res, new_res = execute_simple(old_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,['id','aktualni_cislo_id']) def check_novinky(): old_query = "SELECT * FROM seminar_novinky ORDER BY id" old_res, new_res = execute_simple(old_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,['id','datum','text','obrazek','zverejneno']) if get_user_id_for_org_id(n['autor_id']) != o['autor_id']: raise ValueError("Nesedi autori u novinek") check_skola() check_resitel() check_reseni() check_organizator() check_rocnik() check_cislo() check_priloha_reseni() check_soustredeni() check_soustredeni_ucastnici() check_soustredeni_organizatori() check_nastaveni() check_novinky()