#!/usr/bin/python3 import psycopg2 import psycopg2.extras OLD_DB = "mam_old" NEW_DB = "mamweb" oldconn = psycopg2.connect(f"dbname={OLD_DB}") newconn = psycopg2.connect(f"dbname={NEW_DB}") oldcur = oldconn.cursor(cursor_factory=psycopg2.extras.DictCursor) newcur = newconn.cursor(cursor_factory=psycopg2.extras.DictCursor) # Uses global variables oldcur, newcur! def execute_simple(old_query, new_query=None): if new_query is None: new_query = old_query oldcur.execute(old_query) newcur.execute(new_query) if oldcur.rowcount != newcur.rowcount: raise ValueError(f"Queries '{old_query}' and '{new_query}' returned different number of rows ({oldcur.rowcount} and {newcur.rowcount})") return(oldcur.fetchall(), newcur.fetchall()) def check_same(old_row, new_row, old_fields, new_fields=None): if type(old_fields) != list: old_fields = [old_fields] if new_fields is None: new_fields = old_fields fields = zip(old_fields, new_fields) for old_field, new_field in fields: if old_row[old_field] == new_row[new_field]: continue raise ValueError(f"Fields '{old_field}' and '{new_field}' differs for rows '{old_row}' and '{new_row}'") return True def check_skola(): old_query = "SELECT * FROM seminar_skoly ORDER BY id" new_query = old_query old_res, new_res = execute_simple(old_query,new_query) res = zip(old_res,new_res) for o,n in res: check_same(o,n,['id','aesop_id','izo','nazev','kratky_nazev','ulice','mesto','psc','stat','je_zs','je_ss','poznamka']) check_skola()