from typing import Union import os import shutil import subprocess from PIL import Image from .context import Context class FileInWrongDirError(Exception): pass class ConversionProgramError(Exception): pass class InkscapeError(ConversionProgramError): pass class ImageMagickError(ConversionProgramError): pass class AsyError(ConversionProgramError): pass class ImageProcessorNamespace: public_dir: str cache_dir: str lookup_dirs: list[str] web_path: str include_src: bool def __init__(self, public_dir: str, web_path: str, cache_dir: str, lookup_dirs: list[str], include_src: bool): self.public_dir = public_dir self.cache_dir = cache_dir self.lookup_dirs = lookup_dirs self.web_path = web_path if web_path[-1] != "/" else web_path[:-1] self.include_src = include_src class ImageProcessorSearcher: def get_lookup_dirs(self) -> list[str]: return [] def get_cache_dir(self) -> str: return "" def get_public_dir(self) -> str: return "" def get_web_path(self) -> str: return "" def find_image_in_dir(self, input_filename: str, dir: str) -> Union[str, None]: if os.path.isfile(dir + "/" + input_filename): return dir + "/" + input_filename else: return None def find_image(self, input_filename: str) -> Union[str, None]: for dir in self.get_lookup_dirs(): image = self.find_image_in_dir(input_filename, dir) if image: return image return None def publish_image(self, target_name, relative: bool=True) -> str: cache_path = self.get_cache_dir() + "/" + target_name if not os.path.isfile(cache_path): raise FileNotFoundError(f'Image {target_name} not cached') target_path = self.get_public_dir() + "/" + target_name try: if os.path.exists(target_path): if os.path.getmtime(cache_path) > os.path.getmtime(target_path): os.remove(target_path) os.link(cache_path, target_path) else: os.link(cache_path, target_path) except OSError as e: if e.errno == 18: # Invalid cross-device link: cache and public dirs are on different devices, don't hardlink, copy shutil.copyfile(cache_path, target_path) else: raise e return target_name if relative else target_path class ImageProcessorCacheSearcher(ImageProcessorSearcher): cache_dir: str def __init__(self, cache_dir: str): self.cache_dir = cache_dir if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir, exist_ok=True) def get_lookup_dirs(self) -> list[str]: return [self.cache_dir] def get_cache_dir(self) -> str: return self.cache_dir def get_public_dir(self) -> str: return "" def get_web_path(self) -> str: return "" def publish_image(self, target_name, relative: bool=True) -> str: raise NotImplementedError(); class ImageProcessorNamespaceSearcher(ImageProcessorSearcher): namespace: ImageProcessorNamespace rel_dir: str source_dir: str def __init__(self, namespace: ImageProcessorNamespace, rel_dir: str, source_dir: str): self.namespace = namespace self.rel_dir = rel_dir self.source_dir = source_dir def get_lookup_dirs(self) -> list[str]: return self.namespace.lookup_dirs + ([self.source_dir] if self.namespace.include_src else []) def transform_path(self, path: str) -> str: return path.replace("$dir", self.rel_dir) def get_cache_dir(self) -> str: cache_dir = self.transform_path(self.namespace.cache_dir) if not os.path.exists(cache_dir): os.makedirs(cache_dir, exist_ok=True) return cache_dir def get_public_dir(self) -> str: public_dir = self.transform_path(self.namespace.public_dir) if not os.path.exists(public_dir): os.makedirs(public_dir, exist_ok=True) return public_dir def get_web_path(self) -> str: return self.transform_path(self.namespace.web_path) def get_cache_searcher(self) -> ImageProcessorCacheSearcher: return ImageProcessorCacheSearcher(self.get_cache_dir()) class ImageProcessor: namespaces: dict[str, ImageProcessorNamespace] def __init__(self, namespaces: dict[str, ImageProcessorNamespace]): self.namespaces = namespaces def get_namespace_by_path(self, path: str) -> ImageProcessorNamespace: return self.namespaces[path.split(":")[0] if ":" in path else ""] def get_path_without_namespace(self, path: str) -> str: if len(path.split(":")) <= 1: return path return ":".join(path.split(":")[1:]) def get_searcher_by_path(self, path: str, rel_dir: str, source_dir: str) -> ImageProcessorNamespaceSearcher: return ImageProcessorNamespaceSearcher(self.get_namespace_by_path(path), rel_dir, source_dir) def process_image(self, input_filename: str, format: str, searcher: ImageProcessorSearcher, context: Context=None, width: int=None, height:int=None, quality: int=None, dpi: int=None, fit: bool=True, deps: list[str]=[]) -> str: name = os.path.basename(input_filename) base, ext = os.path.splitext(name) ext = ext[1:] full_path = searcher.find_image(input_filename) if full_path is None: raise FileNotFoundError(f'Image {input_filename} not found in {searcher.get_lookup_dirs()}.') if format == "jpg": format = "jpeg" if format == "": format = ext # Locate all dependencies deps_full = [full_path] for dep in deps: dep_full_path = searcher.find_image(dep) if dep_full_path is None: raise FileNotFoundError(f'Image dependency {dep} not found.') deps_full.append(dep_full_path) # Generate filename from arguments suffix = "" geometry = None if width is not None or height is not None: geometry = f'{width if width is not None else ""}x{height if height is not None else ""}{"" if fit else "!"}' suffix += "_"+geometry if quality is not None: suffix += f'_q{quality}' target_name = base+suffix+"."+format target_path = searcher.get_cache_dir() + "/" + target_name # Only regenerate if the file doesn't already exist and no dependencies are newer if not os.path.isfile(target_path) or self.is_outdated(target_path, deps_full): # If the format is the same or it is just a different extension for # the same format, just copy it. if (((ext == format) or (ext == "epdf" and format == "pdf") or (ext == "jpg" and format == "jpeg")) and width is None and height is None and quality is None and dpi is None): shutil.copyfile(full_path, target_path) # Try to find the converted filename in lookup_dirs, if you find # it, don't convert, just copy. elif searcher.find_image(target_name) is not None and not self.is_outdated(searcher.find_image(target_name), deps): shutil.copyfile(searcher.find_image(target_name), target_path) # Process asymptote elif ext == "asy": # Collect dependencies deps_dir = searcher.get_cache_dir() + "/" + name + "_deps" if not os.path.isdir(deps_dir): os.mkdir(deps_dir) for dep_full in deps_full: dep = os.path.basename(dep_full) if not os.path.isfile(deps_dir + "/" + dep) or os.path.getmtime(deps_dir + "/" + dep) < os.path.getmtime(dep_full): shutil.copyfile(dep_full, deps_dir + "/" + dep) dpi_arg = ['-render', str(dpi/72)] if dpi is not None else [] if subprocess.run(['asy', name, '-o', target_name, '-f', format, *dpi_arg], cwd=deps_dir).returncode != 0: raise AsyError(f"Could not convert '{full_path}' to '{format}'") shutil.move(deps_dir + "/" + target_name, searcher.get_cache_dir() + "/" + target_name) # Convert SVGs using inkscape elif ext == "svg": width_arg = ['--export-width', str(width)] if width is not None else [] height_arg = ['--export-height', str(height)] if height is not None else [] dpi_arg = ['--export-dpi', str(dpi)] if dpi is not None else [] if subprocess.run(['inkscape', full_path, '-o', target_path, *width_arg, *height_arg, *dpi_arg]).returncode != 0: raise InkscapeError(f"Could not convert '{full_path}' to '{format}'") # Convert everything else using ImageMagick. else: resize_arg = ['-resize', str(geometry)] if geometry is not None else [] density_arg = ['-density', str(dpi)] if dpi is not None else [] quality_arg = ['-quality', str(quality)] if quality is not None else [] if subprocess.run(['convert', *density_arg, full_path, *resize_arg, *quality_arg, target_path]).returncode != 0: raise ImageMagickError(f"Could not convert '{full_path}' to '{format}'") if context is not None: context.add_deps(deps_full) return target_name def is_outdated(self, target: str, deps: list[str]): target_timestamp = os.path.getmtime(target) for dep in deps: dep_timestamp = os.path.getmtime(dep) if dep_timestamp > target_timestamp: return True return False def get_image_size(self, full_path: str) -> tuple[int, int]: # Getting image size using ImageMagick is slow. VERY return Image.open(full_path).size