#!/usr/bin/python3 import psycopg2 import psycopg2.extras OLD_DB = "mam_old" NEW_DB = "mamweb" oldconn = psycopg2.connect(f"dbname={OLD_DB}") newconn = psycopg2.connect(f"dbname={NEW_DB}") oldcur = oldconn.cursor(cursor_factory=psycopg2.extras.DictCursor) newcur = newconn.cursor(cursor_factory=psycopg2.extras.DictCursor) # Uses global variables oldcur, newcur! def execute_simple(old_query, new_query=None): if new_query is None: new_query = old_query oldcur.execute(old_query) newcur.execute(new_query) if oldcur.rowcount != newcur.rowcount: raise ValueError(f"Queries '{old_query}' and '{new_query}' returned different number of rows ({oldcur.rowcount} and {newcur.rowcount})") return(oldcur.fetchall(), newcur.fetchall()) def check_same(old_row, new_row, old_fields, new_fields=None): if type(old_fields) != list: old_fields = [old_fields] if new_fields is None: new_fields = old_fields fields = zip(old_fields, new_fields) for old_field, new_field in fields: if old_row[old_field] == new_row[new_field]: continue raise ValueError(f"Fields '{old_field}' and '{new_field}' differs for rows '{old_row}' and '{new_row}'") return True def check_skola(): old_query = "SELECT * FROM seminar_skoly ORDER BY id" new_query = old_query old_res, new_res = execute_simple(old_query,new_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,['id','aesop_id','izo','nazev','kratky_nazev','ulice','mesto','psc','stat','je_zs','je_ss','poznamka']) def check_resitel(): old_query = 'SELECT * FROM seminar_resitele ORDER BY id' new_query = 'SELECT * FROM seminar_resitele JOIN seminar_osoby ON seminar_resitele.osoba_id = seminar_osoby.id ORDER BY seminar_resitele.id' old_res, new_res = execute_simple(old_query,new_query) res = zip(old_res,new_res) fields_osoba = [ 'jmeno', 'prijmeni', 'user', 'pohlavi_muz', 'email', 'telefon', 'datum_narozeni', 'datum_souhlasu_udaje', 'datum_souhlasu_zasilani', 'datum_prihlaseni', 'ulice', 'mesto', 'psc', 'stat', ] fields_keep = [ 'id', 'skola_id', 'rok_maturity', 'zasilat', 'poznamka', ] fields_old = fields_keep+fields_osoba fields_new = fields_keep + ['seminar_osoby.'+f for f in fields_osoba] for o,n in res: check_same(o,n,fields_old, fields_new) def check_reseni(): old_query = 'SELECT * FROM seminar_reseni ORDER BY id' new_query = 'SELECT * FROM seminar_reseni JOIN seminar_hodnoceni AS hodnoceni ON hodnoceni_id = hodnoceni.id ORDER BY id' same_fields = ['id', 'forma', 'poznamka'] renamed_fields = [('timestamp', 'cas_doruceni'), # Also moved fields ('problem_id', 'hodnoceni.problem_id'), ('body', 'hodnoceni.body'), ('cislo_body_id', 'hodnoceni.cislo_body_id'), ] old_fields = same_fields + [f[0] for f in renamed_fields] new_fields = same_fields + [f[1] for f in renamed_fields] old_res, new_res = execute_simple(old_query,new_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,old_fields, new_fields) # Řešitelé jsou nově m2m, takže je musíme dohledat old_query = 'SELECT id, resitel_id FROM seminar_reseni ORDER BY id' new_query = 'SELECT reseni_id, resitel_id FROM seminar_reseni_resitele ORDER BY reseni_id' oldcur = oldconn.cursor() old_results = oldcur.execute(old_query).fetchall() newcur = newconn.cursor() new_results = newcur.execute(old_query).fetchall() for oldr in old_results: if oldr not in new_results: raise ValueError(f'Pair {oldr} not found in new db.') def check_organizator(): old_query = 'SELECT * FROM seminar_organizatori ORDER BY id' new_query = 'SELECT * FROM seminar_organizatori JOIN seminar_osoby AS osoba ON osoba_id = osoba.id JOIN auth_user AS user ON osoba.user_id = user.id ORDER BY id' same_fields = ['studuje', 'strucny_popis_organizatora'] renamed_fields = [ ('user_id', 'user.id'), ('prezdivka', 'osoba.prezdivka'), ('foto', 'osoba.foto'), ('foto_male', 'osoba.foto_male'), ] old_fields = same_fields + [f[0] for f in renamed_fields] new_fields = same_fields + [f[1] for f in renamed_fields] old_res, new_res = execute_simple(old_query,new_query) res = zip(old_res, new_res) for o,n in res: check_same(o,n,old_fields, new_fields) # organizuje od, do: if o.organizuje_od_roku != n.organizuje_od.year: raise ValueError(f'Not matching organizuje_od for org id={o.id}: old {o.organizuje_od_roku}, new {n.organizuje_od}') if o.organizuje_do_roku != n.organizuje_do.year: raise ValueError(f'Not matching organizuje_do for org id={o.id}: old {o.organizuje_do_roku}, new {n.organizuje_do}') check_skola() check_resitel() check_reseni() check_organizator()