You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

512 lines
18 KiB

# -*- coding: utf-8 -*-
import datetime
from django.contrib.auth import get_user_model
from django.contrib.auth.decorators import permission_required, \
user_passes_test
from html.parser import HTMLParser
from django import views as DjangoViews
from django.db import transaction
from django.contrib.auth.models import AnonymousUser
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from enum import Enum
from enum import auto
4 years ago
import logging
import seminar.models as m
import seminar.treelib as t
4 years ago
logger = logging.getLogger(__name__)
org_required = permission_required('auth.org')
resitel_required = permission_required('auth.resitel')
# inspirováno django.contrib.auth.decorators permission_required
def check_perms(user):
if user.has_perms(('auth.resitel',)):
return True
if user.has_perms(('auth.org',)):
return True
return False
resitel_or_org_required = user_passes_test(check_perms)
User = get_user_model()
# Není to úplně hezké, ale budeme doufat, že to je funkční...
User.je_org = property(lambda self: self.has_perm('auth.org'))
User.je_resitel = property(lambda self: self.has_perm('auth.resitel'))
AnonymousUser.je_org = False
AnonymousUser.je_resitel = False
class FirstTagParser(HTMLParser):
def __init__(self, *args, **kwargs):
self.firstTag = None
super().__init__(*args, **kwargs)
def handle_data(self, data):
if self.firstTag == None:
self.firstTag = data
def histogram(seznam):
d = {}
for i in seznam:
if i not in d:
d[i] = 0
d[i] += 1
return d
# Pozor: zarovnáno velmi netradičně pro přehlednost
roman_numerals = zip((1000, 900, 500, 400, 100, 90, 50, 40, 10, 9, 5, 4, 1),
('M', 'CM', 'D', 'CD', 'C', 'XC', 'L', 'XL', 'X', 'IX', 'V', 'IV', 'I'))
def roman(num):
res = ""
for i, n in roman_numerals:
res += n * (num // i)
num %= i
return res
def from_roman(rom):
if not rom:
return 0
for i, n in roman_numerals:
if rom.upper().startswith(n):
return i + from_roman(rom[len(n):])
raise Exception('Invalid roman numeral: "%s"', rom)
def seznam_problemu():
problemy = []
# Pomocna fce k formatovani problemovych hlasek
def prb(cls, msg, objs=None):
s = '<b>%s:</b> %s' % (cls.__name__, msg)
if objs:
s += ' ['
for o in objs:
try:
url = o.admin_url()
except:
url = None
if url:
s += '<a href="%s">%s</a>, ' % (url, o.pk,)
else:
s += '%s, ' % (o.pk,)
s = s[:-2] + ']'
problemy.append(s)
# Duplicita jmen
jmena = {}
for r in m.Resitel.objects.all():
j = r.osoba.plne_jmeno()
if j not in jmena:
jmena[j] = []
jmena[j].append(r)
for j in jmena:
if len(jmena[j]) > 1:
prb(m.Resitel, 'Duplicitní jméno "%s"' % (j,), jmena[j])
# Data maturity a narození
for r in m.Resitel.objects.all():
if not r.rok_maturity:
prb(m.Resitel, 'Neznámý rok maturity', [r])
if r.rok_maturity and (r.rok_maturity < 1990 or r.rok_maturity > datetime.date.today().year + 10):
prb(m.Resitel, 'Podezřelé datum maturity', [r])
if r.osoba.datum_narozeni and (
r.osoba.datum_narozeni.year < 1970 or r.osoba.datum_narozeni.year > datetime.date.today().year - 12):
prb(m.Resitel, 'Podezřelé datum narození', [r])
# if not r.email:
# prb(Resitel, u'Neznámý email', [r])
## Kontroly konzistence databáze a TreeNodů
# Články
for clanek in m.Clanek.objects.all():
# získáme řešení svázané se článkem a z něj node ve stromě
reseni = clanek.reseni_set
if (reseni.count() != 1):
raise ValueError("Článek k sobě má nejedno řešení!")
r = reseni.first()
clanek_node = r.text_cely # vazba na ReseniNode z Reseni
# content type je věc pomáhající rozeznávat různé typy objektů v django-polymorphic
# protože isinstance vrátí vždy jen TreeNode
# https://django-polymorphic.readthedocs.io/en/stable/migrating.html
cislonode_ct = ContentType.objects.get_for_model(m.CisloNode)
node = clanek_node
while node is not None:
node_ct = node.polymorphic_ctype
if node_ct == cislonode_ct: # dostali jsme se k CisloNode
# zkontrolujeme, že stromové číslo odpovídá atributu
# .cislonode je opačná vazba k treenode_ptr, abychom z TreeNode dostali
# CisloNode
if clanek.cislo != node.cislonode.cislo:
prb(m.Clanek, "Číslo otištění uložené u článku nesedí s "
"číslem otištění podle struktury treenodů.", [clanek])
break
node = t.get_parent(node)
return problemy
### Generovani obalek
def resi_v_rocniku(rocnik, cislo=None):
""" Vrátí seznam řešitelů, co vyřešili nějaký problém v daném ročníku, do daného čísla.
Parametry:
rocnik (typu Rocnik) ročník, ze kterého chci řešitele, co něco odevzdali
cislo (typu Cislo) číslo, do kterého včetně se počítá, že v daném
ročníku řešitel něco poslal.
Pokud není zadané, počítají se všechna řešení z daného ročníku.
Výstup:
QuerySet objektů typu Resitel """
if cislo is None:
# filtrujeme pouze podle ročníku
return m.Resitel.objects.filter(rok_maturity__gte=rocnik.druhy_rok(),
reseni__hodnoceni__cislo_body__rocnik=rocnik).distinct()
else: # filtrujeme podle ročníku i čísla
return m.Resitel.objects.filter(rok_maturity__gte=rocnik.druhy_rok(),
reseni__hodnoceni__cislo_body__rocnik=rocnik,
reseni__hodnoceni__cislo_body__poradi__lte=cislo.poradi).distinct()
def aktivniResitele(cislo, pouze_letosni=False):
""" Vrací QuerySet aktivních řešitelů, což jsou ti, co ještě neodmaturovali
a letos něco poslali (anebo loni něco poslali, pokud jde o první tři čísla).
Parametry:
cislo (typu Cislo) číslo, o které se jedná
pouze_letosni jen řešitelé, kteří tento rok něco poslali
"""
letos = cislo.rocnik
# detekujeme, zda jde o první tři čísla či nikoli (tj. zda spamovat řešitele z minulého roku)
zacatek_rocniku = True
try:
if int(cislo.poradi) > 3:
zacatek_rocniku = False
except ValueError:
# if cislo.poradi != '7-8':
# raise ValueError(f'{cislo} je neplatné číslo čísla (není int a není 7-8)')
zacatek_rocniku = False
# nehledě na číslo chceme jen řešitele, kteří letos něco odevzdali
if pouze_letosni:
zacatek_rocniku = False
try:
loni = m.Rocnik.objects.get(rocnik=letos.rocnik - 1)
except ObjectDoesNotExist:
# Pro první ročník neexistuje ročník předchozí
zacatek_rocniku = False
if not zacatek_rocniku:
return resi_v_rocniku(letos, cislo).filter(rok_maturity__gte=letos.druhy_rok())
else:
# spojíme querysety s řešiteli loni a letos do daného čísla
return (resi_v_rocniku(loni) | resi_v_rocniku(letos, cislo)).distinct().filter(rok_maturity__gte=letos.druhy_rok())
def viewMethodSwitch(get, post):
"""
Vrátí view, který zavolá různé jiné views podle toho, kterou metodou je zavolán.
Inspirováno https://docs.djangoproject.com/en/3.1/topics/class-based-views/mixins/#an-alternative-better-solution, jen jsem to udělal genericky.
Parametry:
post view pro metodu POST
get view pro metodu GET
V obou případech se míní view jakožto funkce, takže u class-based views se použít .as_view()
TODO: Podpora i pro metodu HEAD? A možná i pro FILES?
"""
theGetView = get
thePostView = post
class NewView(DjangoViews.View):
def get(self, request, *args, **kwargs):
return theGetView(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
return thePostView(request, *args, **kwargs)
return NewView.as_view()
def cisla_rocniku(rocnik, jen_verejne=True):
"""
Vrátí všechna čísla daného ročníku.
Parametry:
rocnik (Rocnik): ročník semináře
jen_verejne (bool): zda se mají vrátit jen veřejná, nebo všechna čísla
Vrátí:
seznam objektů typu Cislo
"""
if jen_verejne:
return rocnik.verejne_vysledkovky_cisla()
else:
return rocnik.cisla.all().order_by('poradi')
def hlavni_problem(problem):
""" Pro daný problém vrátí jeho nejvyšší nadproblém."""
while not(problem.nadproblem == None):
problem = problem.nadproblem
return problem
def problemy_rocniku(rocnik, jen_verejne=True):
return m.Problem.objects.filter(hodnoceni__in = m.Hodnoceni.objects.filter(cislo_body__in = cisla_rocniku(rocnik, jen_verejne))).distinct().select_related('nadproblem').select_related('nadproblem__nadproblem')
def problemy_cisla(cislo):
""" Vrátí seznam všech problémů s body v daném čísle. """
return m.Problem.objects.filter(hodnoceni__in = m.Hodnoceni.objects.filter(cislo_body = cislo)).distinct().non_polymorphic().select_related('nadproblem').select_related('nadproblem__nadproblem')
def hlavni_problemy_f(problemy=None):
""" Vrátí seznam všech problémů, které již nemají nadproblém. """
# hlavní problémy čísla
# (mají vlastní sloupeček ve výsledkovce, nemají nadproblém)
hlavni_problemy = set()
for p in problemy:
hlavni_problemy.add(hlavni_problem(p))
# zunikátnění
hlavni_problemy = list(hlavni_problemy)
hlavni_problemy.sort(key=lambda k: k.kod_v_rocniku) # setřídit podle t1, t2, c3, ...
return hlavni_problemy
def podproblemy_v_cislu(cislo, problemy=None, hlavni_problemy=None):
""" Vrátí seznam všech problémů s body v daném čísle v poli 'indexovaném' tématy. """
if problemy is None:
problemy = problemy_cisla(cislo)
if hlavni_problemy is None:
hlavni_problemy = hlavni_problemy_f(problemy)
podproblemy = dict((hp.id, []) for hp in hlavni_problemy)
hlavni_problemy = set(hlavni_problemy)
podproblemy[-1] = []
for problem in problemy:
h_problem = hlavni_problem(problem)
if h_problem in hlavni_problemy:
podproblemy[h_problem.id].append(problem)
else:
podproblemy[-1].append(problem)
return podproblemy
class TypDeadline(Enum):
PredDeadline = auto()
SousDeadline = auto()
FinalDeadline = auto()
def deadline_v_rocniku(datum, rocnik):
"""Funkce pro dohledání, ke kterému deadlinu daného ročníku se datum váže.
Vrací trojici (TypDeadline, Cislo, datumDeadline: date).
V případě nevalidního volání není aktuálně chování definováno(!)
"""
cisla = m.Cislo.objects.filter(rocnik=rocnik)
deadliny = []
for c in cisla:
if c.datum_preddeadline is not None:
deadliny.append((TypDeadline.PredDeadline, c, c.datum_preddeadline))
if c.datum_deadline_soustredeni is not None:
deadliny.append((TypDeadline.SousDeadline, c, c.datum_deadline_soustredeni))
if c.datum_deadline is not None:
deadliny.append((TypDeadline.FinalDeadline, c, c.datum_deadline))
deadliny = sorted(deadliny, key=lambda x: x[2]) # podle data
for dl in deadliny:
if datum <= dl[2]:
# První takový deadline je ten nejtěsnější
return dl
logger.error(f'Pro datum {datum} v ročníku {rocnik} neexistuje deadline.')
def deadline(datum):
"""Funkce pro dohledání, ke kterému deadlinu se datum váže.
Vrací trojici (TypDeadline, Cislo, datumDeadline: date). Pokud se deadline nenajde, vrátí None
"""
if isinstance(datum, datetime.datetime):
datum = datum.date()
rok = datum.year
# Dva ročníky podezřelé z obsahování dat
try:
pozdejsi_rocnik = m.Rocnik.objects.get(prvni_rok=rok)
except m.Rocnik.DoesNotExist:
pozdejsi_rocnik = None
try:
drivejsi_rocnik = m.Rocnik.objects.get(prvni_rok=rok-1)
except m.Rocnik.DoesNotExist:
drivejsi_rocnik = None
if drivejsi_rocnik is not None:
# Předpokládáme, že neexistuje číslo, které má deadline ale nemá finální deadline.
# Seznam čísel je potřeba ručně setřídit chronologicky, protože Model říká, že se řadí od nejnovějšího
posledni_deadline_drivejsiho_rocniku = m.Cislo.objects.filter(rocnik=drivejsi_rocnik, datum_deadline__isnull=False).order_by('poradi').last().datum_deadline
logger.debug(f'Nalezené ročníky: {drivejsi_rocnik}, {pozdejsi_rocnik}')
if drivejsi_rocnik is not None and datum <= posledni_deadline_drivejsiho_rocniku:
logger.debug(f'Hledám v dřívějším ročníku: {drivejsi_rocnik}')
return deadline_v_rocniku(datum, drivejsi_rocnik)
else:
logger.debug(f'Hledám v pozdějším ročníku: {pozdejsi_rocnik}')
return deadline_v_rocniku(datum, pozdejsi_rocnik)
def sync_skoly(base_url):
"""Stáhne všechny školy z mamwebu na adrese <base_url> a uloží je do databáze"""
from django.urls import reverse
full_url = base_url.rstrip('/') + reverse('export_skoly')
import requests
from django.core import serializers
json = requests.get(full_url, stream=True).content
for skola in serializers.deserialize('json', json):
skola.save()
@transaction.atomic
def merge_resitele(cilovy, zdrojovy):
"""Spojí dva řešitelské objekty do cílového.
Pojmenování "zdrojový" je silně nepřiléhající, ale co """
# Postup:
# Sjednotit / upravit informace cílového řešitele
print('Upravuji data modelu')
fieldy_shoda = ['skola', 'rok_maturity', 'zasilat', 'zasilat_cislo_emailem']
for f in fieldy_shoda:
zf = getattr(zdrojovy, f)
cf = getattr(cilovy, f)
if cf == zf:
print(f' Údaj {f} je shodný ({zf})')
else:
if zf is None:
print(f' Údaj {f} je pouze v cílovém, používám')
continue
if cf is None:
setattr(cilovy, f, zf)
cilovy.poznamka += f'\nDEBUG: Merge: doplnéný údaj {f} ze zdrojového: {zf}'
print(f" Přiřazuji {f} ze zdrojového: {zf}")
continue
# Jsou fakt různé…
# FIXME: chybí možnost na vlastní úpravu…
verdikt = input(f"\n\n Údaj {f} se u řešitele {cilovy} ({cilovy.id}) liší:\n Zdrojový: {zf}\n Cílový: {cf}\n Který použít, [z]drojový, [c]ílový? ")
verdikt = verdikt[0].casefold()
if verdikt == 'z':
setattr(cilovy, f, zf)
cilovy.poznamka += f'\nDEBUG: Merge: pro {f} použit údaj {zf} (zdrojový), nepoužit {cf} (cílový)'
elif verdikt == 'c':
cilovy.poznamka += f'\nDEBUG: Merge: pro {f} použit údaj {cf} (cílový), nepoužit {zf} (zdrojový)'
else: raise ValueError('Špatná odpověď, řešitel pravděpodobně neuložen')
# poznámku chceme nezahodit…
cilovy.poznamka += f'\nDEBUG: Merge: Původní poznámka: {zdrojovy.poznamka}'
print(f' Výsledný řešitel: {cilovy.__dict__}, ukládám')
cilovy.save()
# Přepojit všechny vazby ze zdrojového na cílového
print('Přepojuji vazby')
# Vazby: Škola (hotovo), Řešení_Řešitelé, Konfery_Účastníci, Soustředění_Účastníci, Osoba (vyřeší se později, nejde přepojit)
ct = m.Reseni_Resitele.objects.filter(resitele=zdrojovy).update(resitele=cilovy)
print(f' Přepojeno {ct} řešení')
ct = m.Konfery_Ucastnici.objects.filter(resitel=zdrojovy).update(resitel=cilovy)
print(f' Přepojeno {ct} konfer')
ct = m.Soustredeni_Ucastnici.objects.filter(resitel=zdrojovy).update(resitel=cilovy)
print(f' Přepojeno {ct} sousů')
# Teď by na zdrojovém řešiteli nemělo nic viset, smazat ho, pamatujíce si jeho Osobu
zdrosoba = zdrojovy.osoba
print(f'Mažu zdrojového řešitele {zdrojovy.__dict__}')
zdrojovy.delete()
# Spojit osoby (separátní funkce).
merge_osoby(cilovy.osoba, zdrosoba)
input("Potvrdit transakci řešitelů (^C pro zrušení) ")
@transaction.atomic
def merge_osoby(cilova, zdrojova):
""" Spojí dvě osoby do cílové
Nehlídá omezení typu "max 1 řešitel na osobu", to by měla hlídat databáze (OneToOneField)."""
# Sjednocení dat
print('Sjednocuji data osob')
# ID, User neřešíme, poznámku vyřešíme separátně.
fieldy = ['datum_narozeni', 'datum_registrace', 'datum_souhlasu_udaje',
'datum_souhlasu_zasilani', 'email', 'foto', 'jmeno', 'mesto',
'pohlavi_muz', 'prezdivka', 'prijmeni', 'psc', 'stat', 'telefon', 'ulice']
for f in fieldy:
zf = getattr(zdrojova, f)
cf = getattr(cilova, f)
if cf == zf:
print(f' Údaj {f} je shodný ({zf})')
else:
if zf is None:
print(f' Údaj {f} je pouze v cílové, používám')
continue
if cf is None:
setattr(cilova, f, zf)
cilova.poznamka += f'\nDEBUG: Merge: doplnéný údaj {f} ze zdrojové: {zf}'
print(f" Přiřazuji {f} ze zdrojové: {zf}")
continue
# Jsou fakt různé…
# FIXME: chybí možnost na vlastní úpravu…
verdikt = input(f"\n\n Údaj {f} se u osoby {cilova} ({cilova.id}) liší:\n Zdrojový: {zf}\n Cílový: {cf}\n Který použít, [z]drojový, [c]ílový? ")
verdikt = verdikt[0].casefold()
if verdikt == 'z':
setattr(cilova, f, zf)
cilova.poznamka += f'\nDEBUG: Merge: pro {f} použit údaj {zf} (zdrojová), nepoužit {cf} (cílová)'
elif verdikt == 'c':
cilova.poznamka += f'\nDEBUG: Merge: pro {f} použit údaj {cf} (cílová), nepoužit {zf} (zdrojová)'
else: raise ValueError('Špatná odpověď, řešitel pravděpodobně neuložen')
# poznámku chceme nezahodit…
cilova.poznamka += f'\nDEBUG: Merge: Původní poznámka: {zdrojova.poznamka}'
print(f' Výsledná osoba: {cilova.__dict__}, ukládám')
cilova.save()
# Vazby: Řešitel, User, Příjemce, Organizátor, Škola.kontaktní_osoba
print('Přepojuji vazby')
ct = m.Skola.objects.filter(kontaktni_osoba=zdrojova).update(kontaktni_osoba=cilova)
print(f' Přepojeno {ct} kontaktních osob')
# Ostatní vazby vyřeší OneToOneFieldy, ale někdy nemusí existovat…
ct = m.Resitel.objects.filter(osoba=zdrojova).update(osoba=cilova)
print(f' Přepojeno {ct} řešitelů')
ct = m.Prijemce.objects.filter(osoba=zdrojova).update(osoba=cilova)
print(f' Přepojeno {ct} příjemců')
ct = m.Organizator.objects.filter(osoba=zdrojova).update(osoba=cilova)
print(f' Přepojeno {ct} organizátorů')
# Uživatelé vedou opačným směrem, radši chceme zkontrolovat, že jsou různí ručně:
if zdrojova.user != cilova.user:
# Jeden z nich může být nenastavený…
if zdrojova.user is None:
print('Uživatel je již v cílové osobě')
elif cilova.user is None:
print('Používám uživatele zdrojové osoby')
cilova.user = zdrojova.user
# Teď nemůžeme uložit, protože kolize uživatelů. Ukládat cílovou budeme až po smazání zdrojové.
else: raise ValueError('Osoby mají obě uživatele, radši padám')
# Uložení a mazání
print(f'Mažu zdrojovou osobu {zdrojova.__dict__}')
zdrojova.delete()
print(f'Ukládám cílovou osobu {cilova.__dict__}')
cilova.save()
input("Potvrdit transakci osob (^C pro zrušení) ")