513 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			513 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| import psycopg2
 | |
| import psycopg2.extras
 | |
| 
 | |
| OLD_DB = "mam_old"
 | |
| NEW_DB = "mamweb"
 | |
| 
 | |
| oldconn = psycopg2.connect(f"dbname={OLD_DB}")
 | |
| newconn = psycopg2.connect(f"dbname={NEW_DB}")
 | |
| 
 | |
| oldcur = oldconn.cursor(cursor_factory=psycopg2.extras.DictCursor)
 | |
| newcur = newconn.cursor(cursor_factory=psycopg2.extras.DictCursor)
 | |
| 
 | |
| 
 | |
| # Uses global variables oldcur, newcur!
 | |
| def execute_simple(old_query, new_query=None):
 | |
| 	if new_query is None:
 | |
| 		new_query = old_query
 | |
| 
 | |
| 	oldcur.execute(old_query)
 | |
| 	newcur.execute(new_query)
 | |
| 
 | |
| 	if oldcur.rowcount != newcur.rowcount:
 | |
| 		raise ValueError(f"Queries '{old_query}' and '{new_query}' returned different number of rows ({oldcur.rowcount} and {newcur.rowcount})")
 | |
| 
 | |
| 	return(oldcur.fetchall(), newcur.fetchall())
 | |
| 
 | |
| def check_same(old_row, new_row, old_fields, new_fields=None):
 | |
| 	if type(old_fields) != list:
 | |
| 		old_fields = [old_fields]
 | |
| 
 | |
| 	if new_fields is None:
 | |
| 		new_fields = old_fields
 | |
| 	
 | |
| 	fields = zip(old_fields, new_fields)
 | |
| 
 | |
| 	for old_field, new_field in fields:
 | |
| 		if old_row[old_field] == new_row[new_field]:
 | |
| 			continue
 | |
| 		raise ValueError(f"Fields '{old_field}'({old_row[old_field]}) and '{new_field}'({new_row[new_field]}) differs for rows \n'{old_row}' and \n'{new_row}'")
 | |
| 	return True
 | |
| 
 | |
| def get_user_id_for_org_id(org_id):
 | |
| 	query = """SELECT auth_user.id FROM auth_user 
 | |
| 		INNER JOIN seminar_osoby ON seminar_osoby.user_id = auth_user.id
 | |
| 		INNER JOIN seminar_organizator ON seminar_organizator.osoba_id = seminar_osoby.id
 | |
| 		WHERE seminar_organizator.id = %s """
 | |
| 	
 | |
| 	newcur.execute(query,(org_id,))
 | |
| 	return newcur.fetchone()['id']
 | |
| 	
 | |
| 
 | |
| 
 | |
| 
 | |
| def check_skola():
 | |
| 	old_query = "SELECT * FROM seminar_skoly ORDER BY id"
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,['id','aesop_id','izo','nazev','kratky_nazev','ulice','mesto','psc','stat','je_zs','je_ss','poznamka'])
 | |
| 
 | |
| def check_resitel():
 | |
| 	old_query = 'SELECT * FROM seminar_resitele ORDER BY id'
 | |
| 	new_query = '''SELECT seminar_resitele.id, skola_id, rok_maturity, zasilat, seminar_resitele.poznamka,
 | |
| 	o.jmeno AS jmeno, o.prijmeni AS prijmeni, o.user_id AS user_id, o.pohlavi_muz AS pohlavi_muz, o.email AS email, o.telefon AS telefon, o.datum_narozeni AS datum_narozeni,
 | |
| 	o.datum_souhlasu_udaje AS datum_souhlasu_udaje, o.datum_souhlasu_zasilani AS datum_souhlasu_zasilani, o.datum_registrace AS datum_prihlaseni, o.ulice AS ulice, o.mesto AS mesto, o.psc AS psc, o.stat AS stat
 | |
| 	FROM seminar_resitele JOIN seminar_osoby AS o ON seminar_resitele.osoba_id = o.id ORDER BY seminar_resitele.id'''
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query,new_query)
 | |
| 
 | |
| 	res = zip(old_res,new_res)
 | |
| 	
 | |
| 	fields_osoba = [
 | |
| 		'jmeno',
 | |
| 		'prijmeni',
 | |
| 		'user_id',
 | |
| 		'pohlavi_muz',
 | |
| 		#'email', #vyreseno separatne
 | |
| 		'telefon',
 | |
| 		'datum_narozeni',
 | |
| 		'datum_souhlasu_udaje',
 | |
| 		'datum_souhlasu_zasilani',
 | |
| 		'datum_prihlaseni',
 | |
| 		'ulice',
 | |
| 		'mesto',
 | |
| 		'psc',
 | |
| 		'stat',
 | |
| 	]
 | |
| 	fields_keep = [
 | |
| 		'id',
 | |
| 		'skola_id',
 | |
| 		'rok_maturity',
 | |
| 		'zasilat',
 | |
| 		'poznamka',
 | |
| 	]
 | |
| 	fields = fields_keep+fields_osoba
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,fields)
 | |
| 		if o['email'] != n['email'] and o['email'] != '':
 | |
| 			print(f"WARNING: Emails differ: old: {o['email']}, new: {n['email']}")
 | |
| 
 | |
| def check_reseni():
 | |
| 	# Migrace 0058 zamerne meni (zmensuje) pocet reseni, aby kazdy clanek mel
 | |
| 	# jen jedno reseni (s vice resiteli, coz postaru neslo)
 | |
| 	# Kvuli tomu je potreba kontrolovat dve veci:
 | |
| 	# 1) Ze kazdy resitel dostal za kazdy problem spravne bodu
 | |
| 	# 2) Ze detaily reseni zustaly zachovany
 | |
| 
 | |
| 	# Cast 1)
 | |
| 	old_query = 'SELECT * FROM seminar_reseni ORDER BY problem_id, resitel_id, body, timestamp'
 | |
| 	new_query = '''SELECT seminar_reseni.id, forma, seminar_reseni.poznamka, cas_doruceni, hodnoceni.problem_id AS problem_id, hodnoceni.body AS body, hodnoceni.cislo_body_id AS cislo_body_id, res.id AS resitel_id
 | |
| 		FROM seminar_reseni
 | |
| 		JOIN seminar_hodnoceni AS hodnoceni ON seminar_reseni.id = hodnoceni.reseni_id
 | |
| 		JOIN seminar_reseni_resitele AS rr ON seminar_reseni.id = rr.reseni_id
 | |
| 		JOIN seminar_resitele AS res ON res.id = rr.resitele_id
 | |
| 		ORDER BY problem_id, resitel_id, body, cas_doruceni'''
 | |
| 
 | |
| 	# Po spojeni nekterych problemu se lisi casy doruceni a poznamky, proto je nebudeme kontrolovat (jde v podstate o triviality, tak je to snad jedno)
 | |
| 	same_fields = ['forma', 'problem_id', 'body', 'cislo_body_id', 'resitel_id']
 | |
| 	renamed_fields = [
 | |
| 		#('timestamp', 'cas_doruceni'),
 | |
| 		]
 | |
| 	old_fields = same_fields + [f[0] for f in renamed_fields]
 | |
| 	new_fields = same_fields + [f[1] for f in renamed_fields]
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query, new_query)
 | |
| 
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,old_fields, new_fields)
 | |
| 
 | |
| 	# Cast 2)
 | |
| 	# Query se lisi tim, ze uz nejoinujeme resitele.
 | |
| 	old_query = 'SELECT * FROM seminar_reseni ORDER BY id'
 | |
| 	new_query = '''SELECT seminar_reseni.id, forma, poznamka, cas_doruceni AS timestamp, h.problem_id AS problem_id, h.body AS body, h.cislo_body_id AS cislo_body_id
 | |
| 		FROM seminar_reseni
 | |
| 		JOIN seminar_hodnoceni AS h ON h.reseni_id = seminar_reseni.id
 | |
| 		ORDER BY id'''
 | |
| 
 | |
| 	# execute_simple kontroluje stejnost poctu radku, to nechceme.
 | |
| 	oldcur.execute(old_query)
 | |
| 	newcur.execute(new_query)
 | |
| 	old_res, new_res = oldcur.fetchall(), newcur.fetchall()
 | |
| 	# Zkontrolujeme, ze pro kazde nove reseni ma stare reseni spravna data.
 | |
| 	new_ids = [n['id'] for n in new_res]
 | |
| 	spravna_old = list(filter(lambda o: o['id'] in new_ids, old_res))
 | |
| 	res = zip(spravna_old,new_res)
 | |
| 	for o,n in res:
 | |
| 		# Tady by se poznamky i timestampy mely zachovat
 | |
| 		# Z nejakeho duvodu se ale poznamky lisi ve whitespace, tak je zkontrolujeme separatne
 | |
| 		check_same(o,n,['id', 'forma', 'timestamp', 'problem_id', 'body', 'cislo_body_id'])
 | |
| 		old_pozn = o['poznamka'].strip()
 | |
| 		new_pozn = n['poznamka'].strip()
 | |
| 		if old_pozn != new_pozn:
 | |
| 			raise ValueError('Poznamky se lisi pro radky {dict(o)} a {dict(n)}')
 | |
| 	
 | |
| 
 | |
| 
 | |
| def check_organizator():
 | |
| 	old_query = 'SELECT * FROM seminar_organizator ORDER BY id'
 | |
| 	new_query = '''SELECT seminar_organizator.id AS id, studuje, strucny_popis_organizatora, users.id AS uid, osoba.prezdivka AS o_prezdivka, osoba.foto AS o_foto, organizuje_od, organizuje_do
 | |
| 		FROM seminar_organizator
 | |
| 		JOIN seminar_osoby AS osoba ON osoba_id = osoba.id 
 | |
| 		JOIN auth_user AS users ON osoba.user_id = users.id 
 | |
| 		ORDER BY seminar_organizator.id'''
 | |
| 
 | |
| 	same_fields = ['studuje', 'strucny_popis_organizatora']
 | |
| 	renamed_fields = [
 | |
| 		('user_id', 'uid'),
 | |
| 		#('prezdivka', 'o_prezdivka'),
 | |
| 		('foto', 'o_foto'),
 | |
| 		]
 | |
| 	old_fields = same_fields + [f[0] for f in renamed_fields]
 | |
| 	new_fields = same_fields + [f[1] for f in renamed_fields]
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query,new_query)
 | |
| 	res = zip(old_res, new_res)
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,old_fields, new_fields)
 | |
| 		# organizuje od, do:
 | |
| 		# Migrace prirazuje aktualni casovou zonu, takze chceme tady rucne vynutit CET.
 | |
| 		from datetime import timedelta, timezone
 | |
| 		cet = timezone(timedelta(hours=1))
 | |
| 		if o['organizuje_od_roku'] is None and n['organizuje_od'] is None:
 | |
| 				pass
 | |
| 		elif o['organizuje_od_roku'] != n['organizuje_od'].astimezone(cet).year:
 | |
| 			raise ValueError(f'Not matching organizuje_od for org id={o["id"]}: old {o["organizuje_od_roku"]}, new {n["organizuje_od"]}')
 | |
| 		if o['organizuje_do_roku'] is None and n['organizuje_do'] is None:
 | |
| 				pass
 | |
| 		elif o['organizuje_do_roku'] != n['organizuje_do'].astimezone(cet).year:
 | |
| 			raise ValueError(f'Not matching organizuje_do for org id={o["id"]}: old {o["organizuje_do_roku"]}, new {n["organizuje_do"]}')
 | |
| 		if o['prezdivka'] == n['o_prezdivka']:
 | |
| 			continue
 | |
| 		if o['prezdivka'] is None and n['o_prezdivka'] == '':
 | |
| 			continue
 | |
| 		raise ValueError(f'Not matching prezdivka for org id={o["id"]}: old {o["prezdivka"]}, new {n["o_prezdivka"]}')
 | |
| 
 | |
| 
 | |
| def check_rocnik():
 | |
| 	old_query = "SELECT * FROM seminar_rocniky ORDER BY id"
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,['id','prvni_rok', 'rocnik', 'exportovat'])
 | |
| 
 | |
| def check_cislo():
 | |
| 	old_query = "SELECT * FROM seminar_cisla ORDER BY id"
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n, ['id','rocnik_id','cislo', 'datum_vydani','datum_deadline','verejne','poznamka','pdf'],
 | |
| 				['id','rocnik_id','poradi','datum_vydani','datum_deadline','verejne','poznamka','pdf'])
 | |
| 
 | |
| def check_priloha_reseni():
 | |
| 	old_query = "SELECT * FROM seminar_priloha_reseni"
 | |
| 	
 | |
| 	old_res, new_res = execute_simple(old_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n, ['id','reseni_id', 'timestamp', 'soubor', 'poznamka'],
 | |
| 				['id','reseni_id', 'vytvoreno', 'soubor', 'poznamka'])
 | |
| 
 | |
| def check_soustredeni():
 | |
| 	old_query = "SELECT * FROM seminar_soustredeni ORDER BY id"
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,['id','rocnik_id','datum_zacatku','datum_konce','verejne','misto','text','typ','exportovat'])
 | |
| 	#Kontrola ucasnici, organizatori v samostatnych funkcich
 | |
| 
 | |
| def check_soustredeni_ucastnici():
 | |
| 	old_query = "SELECT * FROM seminar_soustredeni_ucastnici ORDER BY id"
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,['id','resitel_id','soustredeni_id','poznamka'])
 | |
| 
 | |
| def check_soustredeni_organizatori():
 | |
| 	old_query = "SELECT * FROM seminar_soustredeni_organizatori ORDER BY id"
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,['id','organizator_id','soustredeni_id','poznamka'])
 | |
| 
 | |
| def check_nastaveni():
 | |
| 	old_query = "SELECT * FROM seminar_nastaveni ORDER BY id"
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,['id','aktualni_cislo_id'])
 | |
| 
 | |
| def check_novinky():
 | |
| 	old_query = "SELECT * FROM seminar_novinky ORDER BY id"
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,['id','datum','text','obrazek','zverejneno'])
 | |
| 		if get_user_id_for_org_id(n['autor_id']) != o['autor_id']:
 | |
| 			raise ValueError("Nesedi autori u novinek")
 | |
| 			
 | |
| def check_pohadka():
 | |
| 	old_query = "SELECT * FROM seminar_pohadky ORDER BY id"
 | |
| 	new_query = """SELECT sp.id AS id, sp.autor_id AS autor_id, sp.vytvoreno AS vytvoreno, snp.treenode_ptr_id AS treenode_ptr_id, st.na_web AS text,
 | |
| 				zn_pred.uloha_id AS uloha_pred, zn_po.uloha_id AS uloha_po
 | |
| 			FROM seminar_pohadky AS sp 
 | |
| 			-- Text pohádky
 | |
| 			INNER JOIN seminar_nodes_pohadka AS snp ON sp.id = snp.pohadka_id 
 | |
| 			INNER JOIN seminar_nodes_treenode AS snt ON snt.id = snp.treenode_ptr_id
 | |
| 			INNER JOIN seminar_nodes_obsah AS sno ON sno.treenode_ptr_id = snt.first_child_id 
 | |
| 			INNER JOIN seminar_texty AS st ON sno.text_id = st.id
 | |
| 			-- Predchozí úloha
 | |
| 			LEFT OUTER JOIN seminar_nodes_treenode AS ztn_pred ON ztn_pred.succ_id = snt.id
 | |
| 			LEFT OUTER JOIN seminar_nodes_uloha_zadani AS zn_pred ON zn_pred.treenode_ptr_id = ztn_pred.id
 | |
| 			-- Následující úloha
 | |
| 			LEFT OUTER JOIN seminar_nodes_uloha_zadani AS zn_po ON zn_po.treenode_ptr_id = snt.succ_id
 | |
| 
 | |
| 			ORDER BY sp.id"""
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query,new_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n,['id','timestamp','text'],['id','vytvoreno','text'])
 | |
| 		if o['autor_id'] is not None:
 | |
| 			if get_user_id_for_org_id(n['autor_id']) != o['autor_id']:
 | |
| 				raise ValueError("Nesedi autori u pohadky")
 | |
| 		# Správné úlohy
 | |
| 		# NOTE: o['pred'] rika, zda je pohadka pred ulohou, nikoliv zda je relevantni uloha pred pohadkou!
 | |
| 		spravny_klic = 'uloha_po' if o['pred'] else 'uloha_pred'
 | |
| 		if o['uloha_id'] != n[spravny_klic]:
 | |
| 			raise ValueError(f"Pohádka přidružená ke špatné úloze! old: {o['uloha_id']}, new: {n[spravny_klic]}, pozice: {spravny_klic}")
 | |
| 
 | |
| 
 | |
| # Problémy jsou rozdělené podle typů:
 | |
| def check_problem_common():
 | |
| 	old_query = "SELECT id, nazev, stav, kod, autor_id, text_org, timestamp, typ FROM seminar_problemy ORDER BY id"
 | |
| 	new_query = """SELECT sp.id AS id, sp.nazev AS nazev, sp.stav AS stav, sp.kod AS kod, au.id AS autor_id, sp.poznamka AS poznamka, sp.vytvoreno AS vytvoreno
 | |
| 		FROM seminar_problemy AS sp
 | |
| 		LEFT OUTER JOIN seminar_organizator AS so ON sp.autor_id = so.id
 | |
| 		LEFT OUTER JOIN seminar_osoby AS sos ON so.osoba_id = sos.id
 | |
| 		LEFT OUTER JOIN auth_user AS au ON sos.user_id = au.id
 | |
| 		ORDER BY sp.id"""
 | |
| 
 | |
| 	same_fields = ['id', 'nazev', 'stav', 'autor_id', 'kod']
 | |
| 	renamed_fields = [
 | |
| 		('text_org', 'poznamka'),
 | |
| 		('timestamp', 'vytvoreno'),
 | |
| 		]
 | |
| 	old_fields = same_fields + [f[0] for f in renamed_fields]
 | |
| 	new_fields = same_fields + [f[1] for f in renamed_fields]
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query,new_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n, old_fields, new_fields)
 | |
| 
 | |
| 	# Opravovatelé
 | |
| 	# Po staru byli opravovatele organizatori, takze je potreba je dohledat.
 | |
| 	old_query = """SELECT seminar_problemy.id, org.id AS opravovatel_id FROM seminar_problemy
 | |
| 		JOIN seminar_organizator AS org ON seminar_problemy.opravovatel_id = org.user_id;"""
 | |
| 	new_query = "SELECT problem_id, organizator_id FROM seminar_problemy_opravovatele"
 | |
| 
 | |
| 	# Simple cursors
 | |
| 	#oldcur = oldconn.cursor()
 | |
| 	oldcur.execute(old_query)
 | |
| 	old_results = oldcur.fetchall()
 | |
| 	#newcur = newconn.cursor()
 | |
| 	newcur.execute(new_query)
 | |
| 	new_results = newcur.fetchall()
 | |
| 
 | |
| 	for oldr in old_results:
 | |
| 		if oldr not in new_results:
 | |
| 			raise ValueError(f'Opravovatel pair {oldr} not found in new db.')
 | |
| 
 | |
| 	# Zaměření se vyřeší okometricky (#1186)
 | |
| 
 | |
| 
 | |
| def check_uloha():
 | |
| 	old_query = "SELECT * FROM seminar_problemy WHERE typ = 'uloha' ORDER BY id"
 | |
| 	new_query = """SELECT cislo_zadani_id, cislo_reseni_id, problem_ptr_id, max_body, COALESCE(uzt.na_web, '') AS text_zadani, COALESCE(uvt.na_web, '') AS text_reseni, cislo_deadline_id
 | |
| 	FROM seminar_ulohy
 | |
| 	-- Problém:
 | |
| 	JOIN seminar_problemy AS problem ON problem_ptr_id = problem.id
 | |
| 	-- Text zadání:
 | |
| 	-- ZadaniNode a VzorakNode maji existovat vzdy, ale obsah nemusi (pokud ho nemaji)
 | |
| 	INNER JOIN seminar_nodes_uloha_zadani AS uzn ON problem.id = uzn.uloha_id
 | |
| 	INNER JOIN seminar_nodes_treenode AS uztn ON uztn.id = uzn.treenode_ptr_id
 | |
| 	LEFT OUTER JOIN seminar_nodes_obsah AS uzo ON uzo.treenode_ptr_id = uztn.first_child_id
 | |
| 	LEFT OUTER JOIN seminar_texty AS uzt ON uzo.text_id = uzt.id
 | |
| 	-- Text vzoráku:
 | |
| 	INNER JOIN seminar_nodes_uloha_vzorak AS uvn ON problem.id = uvn.uloha_id
 | |
| 	INNER JOIN seminar_nodes_treenode AS uvtn ON uvtn.id = uvn.treenode_ptr_id
 | |
| 	LEFT OUTER JOIN seminar_nodes_obsah AS uvo ON uvo.treenode_ptr_id = uvtn.first_child_id
 | |
| 	LEFT OUTER JOIN seminar_texty AS uvt ON uvo.text_id = uvt.id
 | |
| 
 | |
| 	ORDER BY problem_ptr_id"""
 | |
| 
 | |
| 	same_fields = ['cislo_zadani_id', 'cislo_reseni_id', 'text_zadani', 'text_reseni']
 | |
| 	renamed_fields = [
 | |
| 		('id', 'problem_ptr_id'),
 | |
| 		('body', 'max_body'),
 | |
| 		]
 | |
| 	old_fields = same_fields + [f[0] for f in renamed_fields]
 | |
| 	new_fields = same_fields + [f[1] for f in renamed_fields]
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query, new_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n, old_fields, new_fields)
 | |
| 		# Datum deadline vypadá prázdně, tak to budeme předpokládat.
 | |
| 		if n['cislo_deadline_id'] is not None:
 | |
| 			raise ValueError("Úloha má deadline.")
 | |
| 
 | |
| def check_tema():
 | |
| 	old_query = """SELECT text_zadani, text_reseni, typ, c.rocnik_id AS rocnik_id
 | |
| 	FROM seminar_problemy
 | |
| 	LEFT OUTER JOIN seminar_cisla AS c ON c.id = cislo_zadani_id
 | |
| 	WHERE typ IN ('tema', 'serial')
 | |
| 	ORDER BY  seminar_problemy.id"""
 | |
| 	new_query = """SELECT tema_typ, COALESCE(zad_text.na_web, '') AS text_zadani, COALESCE(res_text.na_web, '') AS text_reseni, rn.rocnik_id AS rocnik_id
 | |
| 	FROM seminar_temata
 | |
| 	-- Problém:
 | |
| 	JOIN seminar_problemy AS problem ON problem_ptr_id = problem.id
 | |
| 	-- Text:
 | |
| 	-- TvCNode má dva potomky, oba TextNode. První drží původní text zadání, druhý řešení.
 | |
| 	INNER JOIN seminar_nodes_temavcisle AS tvcn ON tvcn.tema_id = id
 | |
| 	INNER JOIN seminar_nodes_treenode AS ttn ON tvcn.treenode_ptr_id = ttn.id
 | |
| 	LEFT OUTER JOIN seminar_nodes_treenode AS zad_tn ON ttn.first_child_id = zad_tn.id -- jen 33 z nich ma zadani
 | |
| 	LEFT OUTER JOIN seminar_nodes_treenode AS res_tn ON zad_tn.succ_id = res_tn.id -- jen 4 z nich ma reseni
 | |
| 	LEFT OUTER JOIN seminar_nodes_obsah AS zad_on ON zad_on.treenode_ptr_id = zad_tn.id
 | |
| 	LEFT OUTER JOIN seminar_nodes_obsah AS res_on ON res_on.treenode_ptr_id = res_tn.id
 | |
| 	LEFT OUTER JOIN seminar_texty AS zad_text ON zad_on.text_id = zad_text.id
 | |
| 	LEFT OUTER JOIN seminar_texty AS res_text ON res_on.text_id = res_text.id -- vsechny 4
 | |
| 	-- Ročník tématu:
 | |
| 	-- Podle rootu TvCN
 | |
| 	LEFT OUTER JOIN seminar_nodes_rocnik AS rn ON ttn.root_id = rn.treenode_ptr_id
 | |
| 
 | |
| 	ORDER BY problem_ptr_id"""
 | |
| 	same_fields = ['text_zadani', 'text_reseni', 'rocnik_id']
 | |
| 	renamed_fields = [
 | |
| 		('typ', 'tema_typ'),
 | |
| 		]
 | |
| 	old_fields = same_fields + [f[0] for f in renamed_fields]
 | |
| 	new_fields = same_fields + [f[1] for f in renamed_fields]
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query, new_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		check_same(o,n, old_fields, new_fields)
 | |
| 
 | |
| def check_konfera():
 | |
| 	old_query = "SELECT * FROM seminar_problemy WHERE typ = 'konfera'"
 | |
| 	new_query = "SELECT * FROM seminar_konfera"
 | |
| 
 | |
| 	oldcur.execute(old_query)
 | |
| 	newcur.execute(new_query)
 | |
| 
 | |
| 	if oldcur.rowcount != 0 or newcur.rowcount != 0:
 | |
| 		raise ValueError('There exists a Konfera!')
 | |
| 
 | |
| def check_org_clanek():
 | |
| 	old_query = "SELECT * FROM seminar_problemy WHERE typ = 'org-clanek'"
 | |
| 
 | |
| 	oldcur.execute(old_query)
 | |
| 
 | |
| 	if oldcur.rowcount != 0:
 | |
| 		raise ValueError('There exists a Org-clanek!')
 | |
| 
 | |
| def check_res_clanek():
 | |
| 	# Dva(!) články mají text (zadání), který se má zachovat.
 | |
| 	old_query = "SELECT * FROM seminar_problemy WHERE typ = 'res-clanek' ORDER BY id"
 | |
| 	new_query = """SELECT cislo_id, text.na_web AS text_zadani
 | |
| 	FROM seminar_clanky
 | |
| 	JOIN seminar_problemy AS problem ON problem_ptr_id = problem.id
 | |
| 	INNER JOIN seminar_hodnoceni AS hodn ON problem.id = hodn.problem_id
 | |
| 	INNER JOIN seminar_reseni AS rese ON rese.id = hodn.reseni_id
 | |
| 	INNER JOIN seminar_nodes_otistene_reseni AS rn ON rese.text_cely_id = rn.treenode_ptr_id -- Tenhle radek neni potreba, ale ujistuje se mj. o spravnem typu TreeNode.
 | |
| 	INNER JOIN seminar_nodes_treenode AS tn ON rn.treenode_ptr_id = tn.id
 | |
| 	-- Nektere clanky vubec nemely text, tak jim migr 0058 nevyrobila dalsi treenody
 | |
| 	LEFT OUTER JOIN seminar_nodes_obsah AS son ON son.treenode_ptr_id = tn.first_child_id
 | |
| 	LEFT OUTER JOIN seminar_texty AS text ON text.id = son.text_id
 | |
| 
 | |
| 	ORDER BY problem_ptr_id"""
 | |
| 	same_fields = ['text_zadani']
 | |
| 	renamed_fields = [
 | |
| 		('cislo_zadani_id', 'cislo_id'),
 | |
| 		]
 | |
| 	old_fields = same_fields + [f[0] for f in renamed_fields]
 | |
| 	new_fields = same_fields + [f[1] for f in renamed_fields]
 | |
| 
 | |
| 	old_res, new_res = execute_simple(old_query, new_query)
 | |
| 	res = zip(old_res,new_res)
 | |
| 
 | |
| 	for o,n in res:
 | |
| 		# text_zadani po novu mohl byt None
 | |
| 		if n['text_zadani'] is None:
 | |
| 			n['text_zadani'] = ''
 | |
| 		check_same(o,n, old_fields, new_fields)
 | |
| 		assert(o['text_reseni'] == '')
 | |
| 
 | |
| def check_untyped_problem():
 | |
| 	old_query = "SELECT * FROM seminar_problemy WHERE typ NOT IN ('uloha', 'tema', 'serial', 'konfera', 'org-clanek', 'res-clanek')"
 | |
| 
 | |
| 	oldcur.execute(old_query)
 | |
| 
 | |
| 	if oldcur.rowcount != 0:
 | |
| 		raise ValueError('There exists a Problem without type!')
 | |
| 
 | |
| 
 | |
| 
 | |
| check_skola()
 | |
| check_resitel()
 | |
| check_reseni()
 | |
| check_organizator()
 | |
| check_rocnik()
 | |
| check_cislo()
 | |
| check_priloha_reseni()
 | |
| check_soustredeni()
 | |
| check_soustredeni_ucastnici()
 | |
| check_soustredeni_organizatori()
 | |
| check_nastaveni()
 | |
| check_novinky()
 | |
| check_pohadka()
 | |
| 
 | |
| check_problem_common()
 | |
| check_uloha()
 | |
| check_tema()
 | |
| check_konfera()
 | |
| check_org_clanek()
 | |
| check_res_clanek()
 | |
| check_untyped_problem()
 |