|
@ -4,46 +4,161 @@ import shutil |
|
|
import subprocess |
|
|
import subprocess |
|
|
from PIL import Image |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FileInWrongDirError(Exception): |
|
|
class FileInWrongDirError(Exception): |
|
|
pass |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ConversionProgramError(Exception): |
|
|
class ConversionProgramError(Exception): |
|
|
pass |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class InkscapeError(ConversionProgramError): |
|
|
class InkscapeError(ConversionProgramError): |
|
|
pass |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ImageMagickError(ConversionProgramError): |
|
|
class ImageMagickError(ConversionProgramError): |
|
|
pass |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AsyError(ConversionProgramError): |
|
|
class AsyError(ConversionProgramError): |
|
|
pass |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ImageProcessorNamespace: |
|
|
class ImageProcessor: |
|
|
|
|
|
public_dir: str |
|
|
public_dir: str |
|
|
cache_dir: str |
|
|
cache_dir: str |
|
|
lookup_dirs: list[str] |
|
|
lookup_dirs: list[str] |
|
|
web_path: str |
|
|
web_path: str |
|
|
|
|
|
include_src: bool |
|
|
|
|
|
|
|
|
def __init__(self, public_dir: str, web_path: str, cache_dir: str, *lookup_dirs: list[str]): |
|
|
def __init__(self, public_dir: str, web_path: str, cache_dir: str, lookup_dirs: list[str], include_src: bool): |
|
|
self.public_dir = public_dir |
|
|
self.public_dir = public_dir |
|
|
self.cache_dir = cache_dir |
|
|
self.cache_dir = cache_dir |
|
|
self.lookup_dirs = lookup_dirs |
|
|
self.lookup_dirs = lookup_dirs |
|
|
self.web_path = web_path if web_path[-1] != "/" else web_path[:-1] |
|
|
self.web_path = web_path if web_path[-1] != "/" else web_path[:-1] |
|
|
|
|
|
self.include_src = include_src |
|
|
if not os.path.exists(self.public_dir): |
|
|
if not os.path.exists(self.public_dir): |
|
|
os.mkdir(self.public_dir) |
|
|
os.mkdir(self.public_dir) |
|
|
if not os.path.exists(self.cache_dir): |
|
|
if not os.path.exists(self.cache_dir): |
|
|
os.mkdir(self.cache_dir) |
|
|
os.mkdir(self.cache_dir) |
|
|
|
|
|
|
|
|
def process_image(self, input_filename: str, format: str, source_dir: str, width: int=None, height:int=None, quality: int=None, dpi: int=None, fit: bool=True, deps: list[str]=[]) -> str: |
|
|
|
|
|
|
|
|
class ImageProcessorSearcher: |
|
|
|
|
|
def get_lookup_dirs(self) -> list[str]: |
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
def get_cache_dir(self) -> str: |
|
|
|
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
def get_public_dir(self) -> str: |
|
|
|
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
def get_web_path(self) -> str: |
|
|
|
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
def find_image_in_dir(self, input_filename: str, dir: str) -> Union[str, None]: |
|
|
|
|
|
if os.path.isfile(dir + "/" + input_filename): |
|
|
|
|
|
return dir + "/" + input_filename |
|
|
|
|
|
else: |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def find_image(self, input_filename: str) -> Union[str, None]: |
|
|
|
|
|
for dir in self.get_lookup_dirs(): |
|
|
|
|
|
image = self.find_image_in_dir(input_filename, dir) |
|
|
|
|
|
if image: |
|
|
|
|
|
return image |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def publish_image(self, target_name, relative: bool=True) -> str: |
|
|
|
|
|
cache_path = self.get_cache_dir() + "/" + target_name |
|
|
|
|
|
if not os.path.isfile(cache_path): |
|
|
|
|
|
raise FileNotFoundError(f'Image {target_name} not cached') |
|
|
|
|
|
target_path = self.get_public_dir() + "/" + target_name |
|
|
|
|
|
try: |
|
|
|
|
|
if os.path.exists(target_path): |
|
|
|
|
|
if os.path.getmtime(cache_path) > os.path.getmtime(target_path): |
|
|
|
|
|
os.remove(target_path) |
|
|
|
|
|
os.link(cache_path, target_path) |
|
|
|
|
|
else: |
|
|
|
|
|
os.link(cache_path, target_path) |
|
|
|
|
|
except OSError as e: |
|
|
|
|
|
if e.errno == 18: # Invalid cross-device link: cache and public dirs are on different devices, don't hardlink, copy |
|
|
|
|
|
shutil.copyfile(cache_path, target_path) |
|
|
|
|
|
else: |
|
|
|
|
|
raise e |
|
|
|
|
|
return target_name if relative else target_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ImageProcessorCacheSearcher(ImageProcessorSearcher): |
|
|
|
|
|
cache_dir: str |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, cache_dir: str): |
|
|
|
|
|
self.cache_dir = cache_dir |
|
|
|
|
|
|
|
|
|
|
|
def get_lookup_dirs(self) -> list[str]: |
|
|
|
|
|
return [self.cache_dir] |
|
|
|
|
|
|
|
|
|
|
|
def get_cache_dir(self) -> str: |
|
|
|
|
|
return self.cache_dir |
|
|
|
|
|
|
|
|
|
|
|
def get_public_dir(self) -> str: |
|
|
|
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
def get_web_path(self) -> str: |
|
|
|
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
def publish_image(self, target_name, relative: bool=True) -> str: |
|
|
|
|
|
raise NotImplementedError(); |
|
|
|
|
|
|
|
|
|
|
|
class ImageProcessorNamespaceSearcher(ImageProcessorSearcher): |
|
|
|
|
|
namespace: ImageProcessorNamespace |
|
|
|
|
|
rel_dir: str |
|
|
|
|
|
source_dir: str |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, namespace: ImageProcessorNamespace, rel_dir: str, source_dir: str): |
|
|
|
|
|
self.namespace = namespace |
|
|
|
|
|
self.rel_dir = rel_dir |
|
|
|
|
|
self.source_dir = source_dir |
|
|
|
|
|
|
|
|
|
|
|
def get_lookup_dirs(self) -> list[str]: |
|
|
|
|
|
return self.namespace.lookup_dirs + ([self.source_dir] if self.namespace.include_src else []) |
|
|
|
|
|
|
|
|
|
|
|
def transform_path(self, path: str) -> str: |
|
|
|
|
|
return path.replace("$dir", self.rel_dir) |
|
|
|
|
|
|
|
|
|
|
|
def get_cache_dir(self) -> str: |
|
|
|
|
|
return self.transform_path(self.namespace.cache_dir) |
|
|
|
|
|
|
|
|
|
|
|
def get_public_dir(self) -> str: |
|
|
|
|
|
return self.transform_path(self.namespace.public_dir) |
|
|
|
|
|
|
|
|
|
|
|
def get_web_path(self) -> str: |
|
|
|
|
|
return self.transform_path(self.namespace.web_path) |
|
|
|
|
|
|
|
|
|
|
|
def get_cache_searcher(self) -> ImageProcessorCacheSearcher: |
|
|
|
|
|
return ImageProcessorCacheSearcher(self.get_cache_dir()) |
|
|
|
|
|
|
|
|
|
|
|
class ImageProcessor: |
|
|
|
|
|
namespaces: dict[str, ImageProcessorNamespace] |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, namespaces: dict[str, ImageProcessorNamespace]): |
|
|
|
|
|
self.namespaces = namespaces |
|
|
|
|
|
|
|
|
|
|
|
def get_namespace_by_path(self, path: str) -> ImageProcessorNamespace: |
|
|
|
|
|
return self.namespaces[path.split(":")[0] if ":" in path else ""] |
|
|
|
|
|
|
|
|
|
|
|
def get_searcher_by_path(self, path: str, rel_dir: str, source_dir: str) -> ImageProcessorNamespaceSearcher: |
|
|
|
|
|
return ImageProcessorNamespaceSearcher(self.get_namespace_by_path(path), rel_dir, source_dir) |
|
|
|
|
|
|
|
|
|
|
|
def process_image(self, input_filename: str, format: str, searcher: ImageProcessorSearcher, width: int=None, height:int=None, quality: int=None, dpi: int=None, fit: bool=True, deps: list[str]=[]) -> str: |
|
|
name = os.path.basename(input_filename) |
|
|
name = os.path.basename(input_filename) |
|
|
base, ext = os.path.splitext(name) |
|
|
base, ext = os.path.splitext(name) |
|
|
ext = ext[1:] |
|
|
ext = ext[1:] |
|
|
full_path = self.find_image(input_filename, [source_dir]) |
|
|
full_path = searcher.find_image(input_filename) |
|
|
if full_path is None: |
|
|
if full_path is None: |
|
|
raise FileNotFoundError(f'Image {input_filename} not found in {self.lookup_dirs} or {source_dir}.') |
|
|
raise FileNotFoundError(f'Image {input_filename} not found in {searcher.get_lookup_dirs()}.') |
|
|
|
|
|
|
|
|
if format == "jpg": |
|
|
if format == "jpg": |
|
|
format = "jpeg" |
|
|
format = "jpeg" |
|
@ -51,7 +166,7 @@ class ImageProcessor: |
|
|
# Locate all dependencies |
|
|
# Locate all dependencies |
|
|
deps_full = [full_path] |
|
|
deps_full = [full_path] |
|
|
for dep in deps: |
|
|
for dep in deps: |
|
|
dep_full_path = self.find_image(dep, [source_dir]) |
|
|
dep_full_path = searcher.find_image(dep) |
|
|
if dep_full_path is None: |
|
|
if dep_full_path is None: |
|
|
raise FileNotFoundError(f'Image dependency {dep} not found.') |
|
|
raise FileNotFoundError(f'Image dependency {dep} not found.') |
|
|
deps_full.append(dep_full_path) |
|
|
deps_full.append(dep_full_path) |
|
@ -65,7 +180,7 @@ class ImageProcessor: |
|
|
if quality is not None: |
|
|
if quality is not None: |
|
|
suffix += f'_q{quality}' |
|
|
suffix += f'_q{quality}' |
|
|
target_name = base+suffix+"."+format |
|
|
target_name = base+suffix+"."+format |
|
|
target_path = self.cache_dir + "/" + target_name |
|
|
target_path = searcher.get_cache_dir() + "/" + target_name |
|
|
|
|
|
|
|
|
# Only regenerate if the file doesn't already exist and no dependencies are newer |
|
|
# Only regenerate if the file doesn't already exist and no dependencies are newer |
|
|
if not os.path.isfile(target_path) or self.is_outdated(target_path, deps_full): |
|
|
if not os.path.isfile(target_path) or self.is_outdated(target_path, deps_full): |
|
@ -80,13 +195,13 @@ class ImageProcessor: |
|
|
|
|
|
|
|
|
# Try to find the converted filename in lookup_dirs, if you find |
|
|
# Try to find the converted filename in lookup_dirs, if you find |
|
|
# it, don't convert, just copy. |
|
|
# it, don't convert, just copy. |
|
|
elif self.find_image(target_name, [source_dir]) is not None and not self.is_outdated(self.find_image(target_name, [source_dir]), deps): |
|
|
elif searcher.find_image(target_name) is not None and not self.is_outdated(searcher.find_image(target_name), deps): |
|
|
shutil.copyfile(self.find_image(target_name, [source_dir]), target_path) |
|
|
shutil.copyfile(searcher.find_image(target_name), target_path) |
|
|
|
|
|
|
|
|
# Process asymptote |
|
|
# Process asymptote |
|
|
elif ext == "asy": |
|
|
elif ext == "asy": |
|
|
# Collect dependencies |
|
|
# Collect dependencies |
|
|
deps_dir = self.cache_dir + "/" + name + "_deps" |
|
|
deps_dir = searcher.get_cache_dir() + "/" + name + "_deps" |
|
|
if not os.path.isdir(deps_dir): |
|
|
if not os.path.isdir(deps_dir): |
|
|
os.mkdir(deps_dir) |
|
|
os.mkdir(deps_dir) |
|
|
for dep_full in deps_full: |
|
|
for dep_full in deps_full: |
|
@ -96,7 +211,7 @@ class ImageProcessor: |
|
|
dpi_arg = ['-render', str(dpi/72)] if dpi is not None else [] |
|
|
dpi_arg = ['-render', str(dpi/72)] if dpi is not None else [] |
|
|
if subprocess.run(['asy', name, '-o', target_name, '-f', format, *dpi_arg], cwd=deps_dir).returncode != 0: |
|
|
if subprocess.run(['asy', name, '-o', target_name, '-f', format, *dpi_arg], cwd=deps_dir).returncode != 0: |
|
|
raise AsyError(f"Could not convert '{full_path}' to '{format}'") |
|
|
raise AsyError(f"Could not convert '{full_path}' to '{format}'") |
|
|
shutil.move(deps_dir + "/" + target_name, self.cache_dir + "/" + target_name) |
|
|
shutil.move(deps_dir + "/" + target_name, searcher.get_cache_dir() + "/" + target_name) |
|
|
|
|
|
|
|
|
# Convert SVGs using inkscape |
|
|
# Convert SVGs using inkscape |
|
|
elif ext == "svg": |
|
|
elif ext == "svg": |
|
@ -124,37 +239,7 @@ class ImageProcessor: |
|
|
return True |
|
|
return True |
|
|
return False |
|
|
return False |
|
|
|
|
|
|
|
|
def publish_image(self, target_name, relative: bool=True) -> str: |
|
|
def get_image_size(self, full_path: str) -> tuple[int, int]: |
|
|
cache_path = self.cache_dir + "/" + target_name |
|
|
|
|
|
if not os.path.isfile(cache_path): |
|
|
|
|
|
raise FileNotFoundError(f'Image {target_name} not cached') |
|
|
|
|
|
target_path = self.public_dir + "/" + target_name |
|
|
|
|
|
try: |
|
|
|
|
|
if os.path.exists(target_path): |
|
|
|
|
|
if os.path.getmtime(cache_path) > os.path.getmtime(target_path): |
|
|
|
|
|
os.remove(target_path) |
|
|
|
|
|
os.link(cache_path, target_path) |
|
|
|
|
|
else: |
|
|
|
|
|
os.link(cache_path, target_path) |
|
|
|
|
|
except OSError as e: |
|
|
|
|
|
if e.errno == 18: # Invalid cross-device link: cache and public dirs are on different devices, don't hardlink, copy |
|
|
|
|
|
shutil.copyfile(cache_path, target_path) |
|
|
|
|
|
else: |
|
|
|
|
|
raise e |
|
|
|
|
|
return target_name if relative else target_path |
|
|
|
|
|
|
|
|
|
|
|
def get_image_size(self, input_filename: str, additional_dirs: list[str]=[]) -> tuple[int, int]: |
|
|
|
|
|
full_path = self.find_image(input_filename, additional_dirs) |
|
|
|
|
|
if full_path is None: |
|
|
|
|
|
raise FileNotFoundError(f'Image {input_filename} not found.') |
|
|
|
|
|
# Getting image size using ImageMagick is slow. VERY |
|
|
# Getting image size using ImageMagick is slow. VERY |
|
|
return Image.open(full_path).size |
|
|
return Image.open(full_path).size |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_image(self, input_filename: str, additional_dirs: list[str]=[]) -> Union[str, None]: |
|
|
|
|
|
for dir in [*self.lookup_dirs, *additional_dirs]: |
|
|
|
|
|
if os.path.isfile(dir + "/" + input_filename): |
|
|
|
|
|
return dir + "/" + input_filename |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|