mirror of
https://github.com/johndoe6345789/WizardMerge.git
synced 2026-04-25 14:14:59 +00:00
304 lines
10 KiB
Python
304 lines
10 KiB
Python
"""Extract image XObjects from wizardmerge.pdf and emit a JSON manifest.
|
|
|
|
The script avoids external dependencies so it can run in constrained environments.
|
|
Flate-encoded images are converted into PNG byte streams, while DCT-encoded
|
|
images are treated as JPEG. A companion ``images.json`` file captures every
|
|
image's metadata, a lightweight content analysis, and a base64 payload without
|
|
writing raw binaries to disk. Semantic file names are generated from the
|
|
analysis (color, contrast, orientation) so the manifest is easier to navigate.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import pathlib
|
|
import re
|
|
import struct
|
|
import zlib
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Iterable, List, Optional, Tuple
|
|
|
|
PDF_PATH = pathlib.Path("wizardmerge.pdf")
|
|
OUTPUT_DIR = pathlib.Path("extracted_graphics")
|
|
|
|
|
|
@dataclass
|
|
class ImageObject:
|
|
"""Metadata and raw bytes for a single PDF image object."""
|
|
|
|
object_number: int
|
|
width: int
|
|
height: int
|
|
color_space: str
|
|
bits_per_component: int
|
|
filter: str
|
|
stream: bytes
|
|
|
|
@property
|
|
def channels(self) -> int:
|
|
if "/DeviceRGB" in self.color_space:
|
|
return 3
|
|
if "/DeviceGray" in self.color_space:
|
|
return 1
|
|
raise ValueError(f"Unsupported colorspace {self.color_space!r}")
|
|
|
|
|
|
OBJECT_PATTERN = re.compile(rb"(\d+)\s+\d+\s+obj(.*?)endobj", re.DOTALL)
|
|
|
|
|
|
def _extract_stream(obj_bytes: bytes) -> bytes:
|
|
"""Return the raw stream bytes for a PDF object."""
|
|
|
|
stream_match = re.search(rb"stream\r?\n", obj_bytes)
|
|
if not stream_match:
|
|
raise ValueError("No stream found in object")
|
|
|
|
start = stream_match.end()
|
|
length_match = re.search(rb"/Length\s+(\d+)", obj_bytes)
|
|
if length_match:
|
|
length = int(length_match.group(1))
|
|
return obj_bytes[start : start + length]
|
|
|
|
end = obj_bytes.find(b"endstream", start)
|
|
return obj_bytes[start:end].rstrip(b"\r\n")
|
|
|
|
|
|
def iter_image_objects(pdf_bytes: bytes) -> Iterable[ImageObject]:
|
|
"""Yield image objects discovered in the PDF payload."""
|
|
|
|
for match in OBJECT_PATTERN.finditer(pdf_bytes):
|
|
obj_bytes = match.group(0)
|
|
if b"/Subtype /Image" not in obj_bytes:
|
|
continue
|
|
|
|
object_number = int(match.group(1))
|
|
|
|
def _lookup(name: bytes) -> Optional[str]:
|
|
pattern = re.search(rb"/" + name + rb"\s+(/[^\s]+)", obj_bytes)
|
|
return pattern.group(1).decode("ascii") if pattern else None
|
|
|
|
width_match = re.search(rb"/Width\s+(\d+)", obj_bytes)
|
|
height_match = re.search(rb"/Height\s+(\d+)", obj_bytes)
|
|
bits_match = re.search(rb"/BitsPerComponent\s+(\d+)", obj_bytes)
|
|
|
|
if not (width_match and height_match and bits_match):
|
|
raise ValueError(f"Image {object_number} missing dimension metadata")
|
|
|
|
image = ImageObject(
|
|
object_number=object_number,
|
|
width=int(width_match.group(1)),
|
|
height=int(height_match.group(1)),
|
|
color_space=_lookup(b"ColorSpace") or "/DeviceRGB",
|
|
bits_per_component=int(bits_match.group(1)),
|
|
filter=_lookup(b"Filter") or "",
|
|
stream=_extract_stream(obj_bytes),
|
|
)
|
|
yield image
|
|
|
|
|
|
def _png_chunk(tag: bytes, payload: bytes) -> bytes:
|
|
length = struct.pack(">I", len(payload))
|
|
crc = struct.pack(">I", zlib.crc32(tag + payload) & 0xFFFFFFFF)
|
|
return length + tag + payload + crc
|
|
|
|
|
|
def _dominant_color_label(means: Tuple[float, ...]) -> str:
|
|
"""Return a coarse color label from per-channel means."""
|
|
|
|
if len(means) == 1:
|
|
gray = means[0]
|
|
if gray < 16:
|
|
return "black"
|
|
if gray < 64:
|
|
return "dark-gray"
|
|
if gray < 160:
|
|
return "mid-gray"
|
|
if gray < 224:
|
|
return "light-gray"
|
|
return "white"
|
|
|
|
red, green, blue = means
|
|
brightness = (red + green + blue) / 3
|
|
if max(red, green, blue) - min(red, green, blue) < 12:
|
|
# Essentially grayscale
|
|
return _dominant_color_label((brightness,))
|
|
|
|
dominant_channel = max(range(3), key=lambda idx: (red, green, blue)[idx])
|
|
channel_names = {0: "red", 1: "green", 2: "blue"}
|
|
brightness_label = _dominant_color_label((brightness,))
|
|
return f"{brightness_label}-{channel_names[dominant_channel]}"
|
|
|
|
|
|
def _orientation_tag(width: int, height: int) -> str:
|
|
if width == height:
|
|
return "square"
|
|
if width > height:
|
|
return "landscape"
|
|
return "portrait"
|
|
|
|
|
|
def analyse_flate_image(image: ImageObject) -> Dict[str, object]:
|
|
"""Compute basic color statistics for a Flate-decoded image."""
|
|
|
|
raw = zlib.decompress(image.stream)
|
|
row_stride = image.width * image.channels
|
|
expected_size = row_stride * image.height
|
|
if len(raw) != expected_size:
|
|
raise ValueError(
|
|
f"Unexpected data length for image {image.object_number}: "
|
|
f"got {len(raw)}, expected {expected_size}"
|
|
)
|
|
|
|
channel_stats = [
|
|
{"count": 0, "mean": 0.0, "m2": 0.0, "min": 255, "max": 0}
|
|
for _ in range(image.channels)
|
|
]
|
|
palette: set[Tuple[int, ...]] = set()
|
|
palette_limit = 1024
|
|
|
|
for idx in range(0, len(raw), image.channels):
|
|
for channel in range(image.channels):
|
|
value = raw[idx + channel]
|
|
stats = channel_stats[channel]
|
|
stats["count"] += 1
|
|
delta = value - stats["mean"]
|
|
stats["mean"] += delta / stats["count"]
|
|
stats["m2"] += delta * (value - stats["mean"])
|
|
stats["min"] = min(stats["min"], value)
|
|
stats["max"] = max(stats["max"], value)
|
|
|
|
if len(palette) < palette_limit:
|
|
if image.channels == 1:
|
|
palette.add((raw[idx],))
|
|
else:
|
|
palette.add(tuple(raw[idx : idx + image.channels]))
|
|
|
|
means = tuple(stat["mean"] for stat in channel_stats)
|
|
variances = tuple(stat["m2"] / max(stat["count"], 1) for stat in channel_stats)
|
|
palette_size = len(palette) if len(palette) < palette_limit else None
|
|
primary_color = _dominant_color_label(means)
|
|
|
|
return {
|
|
"means": means,
|
|
"variances": variances,
|
|
"min": tuple(stat["min"] for stat in channel_stats),
|
|
"max": tuple(stat["max"] for stat in channel_stats),
|
|
"palette_size": palette_size,
|
|
"primary_color": primary_color,
|
|
"orientation": _orientation_tag(image.width, image.height),
|
|
}
|
|
|
|
|
|
def semantic_name(image: ImageObject, mime: str, analysis: Optional[Dict[str, object]]) -> str:
|
|
"""Generate a more meaningful file name based on image analysis."""
|
|
|
|
extension = "png" if mime == "image/png" else "jpg"
|
|
base_parts = []
|
|
|
|
if analysis:
|
|
palette_size = analysis.get("palette_size")
|
|
variances: Tuple[float, ...] = analysis.get("variances", ()) # type: ignore[assignment]
|
|
variance_score = sum(variances) / max(len(variances), 1)
|
|
primary_color = analysis.get("primary_color") or "unknown"
|
|
base_parts.append(primary_color)
|
|
|
|
if palette_size == 1:
|
|
base_parts.append("solid")
|
|
elif palette_size and palette_size <= 4:
|
|
base_parts.append("two-tone")
|
|
elif variance_score < 400:
|
|
base_parts.append("low-contrast")
|
|
else:
|
|
base_parts.append("detailed")
|
|
|
|
base_parts.append(str(analysis.get("orientation", "unknown")))
|
|
else:
|
|
base_parts.extend(["jpeg", _orientation_tag(image.width, image.height)])
|
|
|
|
base_parts.append(f"{image.width}x{image.height}")
|
|
base_parts.append(f"obj{image.object_number}")
|
|
return "-".join(base_parts) + f".{extension}"
|
|
|
|
|
|
def raw_to_png(image: ImageObject) -> tuple[bytes, Dict[str, object]]:
|
|
"""Convert a Flate-encoded image stream to PNG bytes and analysis."""
|
|
|
|
if image.bits_per_component != 8:
|
|
raise ValueError(f"Unsupported bit depth: {image.bits_per_component}")
|
|
|
|
analysis = analyse_flate_image(image)
|
|
|
|
raw = zlib.decompress(image.stream)
|
|
row_stride = image.width * image.channels
|
|
filtered = b"".join(
|
|
b"\x00" + raw[i : i + row_stride] for i in range(0, len(raw), row_stride)
|
|
)
|
|
|
|
color_type = 2 if image.channels == 3 else 0
|
|
ihdr = struct.pack(
|
|
">IIBBBBB", image.width, image.height, 8, color_type, 0, 0, 0
|
|
)
|
|
png = b"\x89PNG\r\n\x1a\n"
|
|
png += _png_chunk(b"IHDR", ihdr)
|
|
png += _png_chunk(b"IDAT", zlib.compress(filtered))
|
|
png += _png_chunk(b"IEND", b"")
|
|
return png, analysis
|
|
|
|
|
|
def save_images(images: List[ImageObject]) -> None:
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
manifest: List[dict[str, object]] = []
|
|
errors: List[str] = []
|
|
|
|
for index, image in enumerate(sorted(images, key=lambda im: im.object_number), start=1):
|
|
analysis: Optional[Dict[str, object]] = None
|
|
|
|
try:
|
|
if image.filter == "/FlateDecode":
|
|
raw_bytes, analysis = raw_to_png(image)
|
|
mime = "image/png"
|
|
elif image.filter == "/DCTDecode":
|
|
raw_bytes = image.stream
|
|
mime = "image/jpeg"
|
|
else:
|
|
raise ValueError(f"Unsupported filter {image.filter}")
|
|
except Exception as exc: # noqa: BLE001 - surface helpful error context
|
|
placeholder = f"obj{image.object_number}"
|
|
errors.append(f"{placeholder}: {exc}")
|
|
print(f"Skipping {placeholder}: {exc}")
|
|
continue
|
|
|
|
name = semantic_name(image, mime, analysis)
|
|
encoded = base64.b64encode(raw_bytes).decode("ascii")
|
|
manifest.append(
|
|
{
|
|
"name": name,
|
|
"object_number": image.object_number,
|
|
"width": image.width,
|
|
"height": image.height,
|
|
"color_space": image.color_space,
|
|
"bits_per_component": image.bits_per_component,
|
|
"mime": mime,
|
|
"base64": encoded,
|
|
"analysis": analysis,
|
|
}
|
|
)
|
|
print(f"Captured {name} ({image.width}x{image.height}, {mime})")
|
|
|
|
images_path = OUTPUT_DIR / "images.json"
|
|
images_path.write_text(json.dumps(manifest, indent=2))
|
|
if errors:
|
|
(OUTPUT_DIR / "errors.txt").write_text("\n".join(errors))
|
|
print(f"Encountered errors for {len(errors)} image(s); see errors.txt")
|
|
print(f"Wrote JSON manifest to {images_path}")
|
|
|
|
|
|
def main() -> None:
|
|
pdf_bytes = PDF_PATH.read_bytes()
|
|
images = list(iter_image_objects(pdf_bytes))
|
|
save_images(images)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|