You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

257 lines
8.5 KiB

from typing import Union
import os
import shutil
import subprocess
from PIL import Image
from .context import Context
class FileInWrongDirError(Exception):
pass
class ConversionProgramError(Exception):
pass
class InkscapeError(ConversionProgramError):
pass
class ImageMagickError(ConversionProgramError):
pass
class AsyError(ConversionProgramError):
pass
class ImageProcessorNamespace:
public_dir: str
cache_dir: str
lookup_dirs: list[str]
web_path: str
include_src: bool
def __init__(self, public_dir: str, web_path: str, cache_dir: str, lookup_dirs: list[str], include_src: bool):
self.public_dir = public_dir
self.cache_dir = cache_dir
self.lookup_dirs = lookup_dirs
self.web_path = web_path if web_path[-1] != "/" else web_path[:-1]
self.include_src = include_src
class ImageProcessorSearcher:
def get_lookup_dirs(self) -> list[str]:
return []
def get_cache_dir(self) -> str:
return ""
def get_public_dir(self) -> str:
return ""
def get_web_path(self) -> str:
return ""
def find_image_in_dir(self, input_filename: str, dir: str) -> Union[str, None]:
if os.path.isfile(dir + "/" + input_filename):
return dir + "/" + input_filename
else:
return None
def find_image(self, input_filename: str) -> Union[str, None]:
for dir in self.get_lookup_dirs():
image = self.find_image_in_dir(input_filename, dir)
if image:
return image
return None
def publish_image(self, target_name, relative: bool=True) -> str:
cache_path = self.get_cache_dir() + "/" + target_name
if not os.path.isfile(cache_path):
raise FileNotFoundError(f'Image {target_name} not cached')
target_path = self.get_public_dir() + "/" + target_name
try:
if os.path.exists(target_path):
if os.path.getmtime(cache_path) > os.path.getmtime(target_path):
os.remove(target_path)
os.link(cache_path, target_path)
else:
os.link(cache_path, target_path)
except OSError as e:
if e.errno == 18: # Invalid cross-device link: cache and public dirs are on different devices, don't hardlink, copy
shutil.copyfile(cache_path, target_path)
else:
raise e
return target_name if relative else target_path
class ImageProcessorCacheSearcher(ImageProcessorSearcher):
cache_dir: str
def __init__(self, cache_dir: str):
self.cache_dir = cache_dir
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir, exist_ok=True)
def get_lookup_dirs(self) -> list[str]:
return [self.cache_dir]
def get_cache_dir(self) -> str:
return self.cache_dir
def get_public_dir(self) -> str:
return ""
def get_web_path(self) -> str:
return ""
def publish_image(self, target_name, relative: bool=True) -> str:
raise NotImplementedError();
class ImageProcessorNamespaceSearcher(ImageProcessorSearcher):
namespace: ImageProcessorNamespace
rel_dir: str
source_dir: str
def __init__(self, namespace: ImageProcessorNamespace, rel_dir: str, source_dir: str):
self.namespace = namespace
self.rel_dir = rel_dir
self.source_dir = source_dir
def get_lookup_dirs(self) -> list[str]:
return self.namespace.lookup_dirs + ([self.source_dir] if self.namespace.include_src else [])
def transform_path(self, path: str) -> str:
return path.replace("$dir", self.rel_dir)
def get_cache_dir(self) -> str:
cache_dir = self.transform_path(self.namespace.cache_dir)
if not os.path.exists(cache_dir):
os.makedirs(cache_dir, exist_ok=True)
return cache_dir
def get_public_dir(self) -> str:
public_dir = self.transform_path(self.namespace.public_dir)
if not os.path.exists(public_dir):
os.makedirs(public_dir, exist_ok=True)
return public_dir
def get_web_path(self) -> str:
return self.transform_path(self.namespace.web_path)
def get_cache_searcher(self) -> ImageProcessorCacheSearcher:
return ImageProcessorCacheSearcher(self.get_cache_dir())
class ImageProcessor:
namespaces: dict[str, ImageProcessorNamespace]
def __init__(self, namespaces: dict[str, ImageProcessorNamespace]):
self.namespaces = namespaces
def get_namespace_by_path(self, path: str) -> ImageProcessorNamespace:
return self.namespaces[path.split(":")[0] if ":" in path else ""]
def get_path_without_namespace(self, path: str) -> str:
if len(path.split(":")) <= 1:
return path
return ":".join(path.split(":")[1:])
def get_searcher_by_path(self, path: str, rel_dir: str, source_dir: str) -> ImageProcessorNamespaceSearcher:
return ImageProcessorNamespaceSearcher(self.get_namespace_by_path(path), rel_dir, source_dir)
def process_image(self, input_filename: str, format: str, searcher: ImageProcessorSearcher, context: Context=None, width: int=None, height:int=None, quality: int=None, dpi: int=None, fit: bool=True, deps: list[str]=[]) -> str:
name = os.path.basename(input_filename)
base, ext = os.path.splitext(name)
ext = ext[1:]
full_path = searcher.find_image(input_filename)
if full_path is None:
raise FileNotFoundError(f'Image {input_filename} not found in {searcher.get_lookup_dirs()}.')
if format == "jpg":
format = "jpeg"
# Locate all dependencies
deps_full = [full_path]
for dep in deps:
dep_full_path = searcher.find_image(dep)
if dep_full_path is None:
raise FileNotFoundError(f'Image dependency {dep} not found.')
deps_full.append(dep_full_path)
# Generate filename from arguments
suffix = ""
geometry = None
if width is not None or height is not None:
geometry = f'{width if width is not None else ""}x{height if height is not None else ""}{"" if fit else "!"}'
suffix += "_"+geometry
if quality is not None:
suffix += f'_q{quality}'
target_name = base+suffix+"."+format
target_path = searcher.get_cache_dir() + "/" + target_name
# Only regenerate if the file doesn't already exist and no dependencies are newer
if not os.path.isfile(target_path) or self.is_outdated(target_path, deps_full):
# If the format is the same or it is just a different extension for
# the same format, just copy it.
if (((ext == format)
or (ext == "epdf" and format == "pdf")
or (ext == "jpg" and format == "jpeg"))
and width is None and height is None and quality is None and dpi is None):
shutil.copyfile(full_path, target_path)
# Try to find the converted filename in lookup_dirs, if you find
# it, don't convert, just copy.
elif searcher.find_image(target_name) is not None and not self.is_outdated(searcher.find_image(target_name), deps):
shutil.copyfile(searcher.find_image(target_name), target_path)
# Process asymptote
elif ext == "asy":
# Collect dependencies
deps_dir = searcher.get_cache_dir() + "/" + name + "_deps"
if not os.path.isdir(deps_dir):
os.mkdir(deps_dir)
for dep_full in deps_full:
dep = os.path.basename(dep_full)
if not os.path.isfile(deps_dir + "/" + dep) or os.path.getmtime(deps_dir + "/" + dep) < os.path.getmtime(dep_full):
shutil.copyfile(dep_full, deps_dir + "/" + dep)
dpi_arg = ['-render', str(dpi/72)] if dpi is not None else []
if subprocess.run(['asy', name, '-o', target_name, '-f', format, *dpi_arg], cwd=deps_dir).returncode != 0:
raise AsyError(f"Could not convert '{full_path}' to '{format}'")
shutil.move(deps_dir + "/" + target_name, searcher.get_cache_dir() + "/" + target_name)
# Convert SVGs using inkscape
elif ext == "svg":
width_arg = ['--export-width', str(width)] if width is not None else []
height_arg = ['--export-height', str(height)] if height is not None else []
dpi_arg = ['--export-dpi', str(dpi)] if dpi is not None else []
if subprocess.run(['inkscape', full_path, '-o', target_path, *width_arg, *height_arg, *dpi_arg]).returncode != 0:
raise InkscapeError(f"Could not convert '{full_path}' to '{format}'")
# Convert everything else using ImageMagick.
else:
resize_arg = ['-resize', str(geometry)] if geometry is not None else []
density_arg = ['-density', str(dpi)] if dpi is not None else []
quality_arg = ['-quality', str(quality)] if quality is not None else []
if subprocess.run(['convert', *density_arg, full_path, *resize_arg, *quality_arg, target_path]).returncode != 0:
raise ImageMagickError(f"Could not convert '{full_path}' to '{format}'")
if context is not None:
context.add_deps(deps_full)
return target_name
def is_outdated(self, target: str, deps: list[str]):
target_timestamp = os.path.getmtime(target)
for dep in deps:
dep_timestamp = os.path.getmtime(dep)
if dep_timestamp > target_timestamp:
return True
return False
def get_image_size(self, full_path: str) -> tuple[int, int]:
# Getting image size using ImageMagick is slow. VERY
return Image.open(full_path).size