Initial commit

Photo-based book cataloger with AI identification.
Room → Cabinet → Shelf → Book hierarchy; FastAPI + SQLite backend;
vanilla JS SPA; OpenAI-compatible plugin system for boundary
detection, text recognition, and archive search.
This commit is contained in:
2026-03-09 14:17:13 +03:00
commit 084d1aebd5
64 changed files with 8605 additions and 0 deletions

241
src/plugins/__init__.py Normal file
View File

@@ -0,0 +1,241 @@
"""Plugin registry for bookshelf automations.
Functions are loaded from config at startup via load_plugins().
Four categories: boundary_detectors, text_recognizers, book_identifiers, archive_searchers.
"""
import logging
from typing import Any, Literal, overload
from config import AIFunctionConfig, AppConfig, CredentialConfig, ModelConfig
from models import (
AIConfig,
ASPluginResult,
ArchiveSearcherPlugin,
BDPluginResult,
BIPluginResult,
BookIdentifierPlugin,
BoundaryDetectorPlugin,
NotFoundResult,
PluginLookupResult,
PluginManifestEntry,
TextRecognizerPlugin,
TRPluginResult,
)
from .rate_limiter import RateLimiter
RATE_LIMITER = RateLimiter()
_logger = logging.getLogger(__name__)
# ── Typed per-category registries ─────────────────────────────────────────────
_boundary_detectors: dict[str, BoundaryDetectorPlugin] = {}
_text_recognizers: dict[str, TextRecognizerPlugin] = {}
_book_identifiers: dict[str, BookIdentifierPlugin] = {}
_archive_searchers: dict[str, ArchiveSearcherPlugin] = {}
_type_to_class: dict[str, Any] = {} # populated lazily on first call
def _archive_classes() -> dict[str, Any]:
if not _type_to_class:
from .archives.html_scraper import HtmlScraperPlugin
from .archives.openlibrary import OpenLibraryPlugin
from .archives.rsl import RSLPlugin
from .archives.sru_catalog import SRUCatalogPlugin
_type_to_class.update(
{
"openlibrary": OpenLibraryPlugin,
"rsl": RSLPlugin,
"html_scraper": HtmlScraperPlugin,
"sru_catalog": SRUCatalogPlugin,
}
)
return _type_to_class
def _build_ai_cfg(model_cfg: ModelConfig, cred_cfg: CredentialConfig, func: AIFunctionConfig) -> AIConfig:
"""Assemble runtime AIConfig from the 3-layer config (credentials → models → functions)."""
return AIConfig(
base_url=cred_cfg.base_url,
api_key=cred_cfg.api_key,
model=model_cfg.model,
max_image_px=func.max_image_px,
confidence_threshold=func.confidence_threshold,
extra_body=model_cfg.extra_body,
)
def load_plugins(config: AppConfig) -> None:
"""Populate the plugin registry from a typed AppConfig."""
from .ai_compat import (
BookIdentifierPlugin as BIClass,
BoundaryDetectorBooksPlugin,
BoundaryDetectorShelvesPlugin,
TextRecognizerPlugin as TRClass,
)
_boundary_detectors.clear()
_text_recognizers.clear()
_book_identifiers.clear()
_archive_searchers.clear()
archive_cls = _archive_classes()
for key, func in config.functions.boundary_detectors.items():
if key == "shelves":
bd_cls = BoundaryDetectorShelvesPlugin
elif key == "books":
bd_cls = BoundaryDetectorBooksPlugin
else:
_logger.warning("Unknown boundary_detector key %r — must be 'shelves' or 'books'", key)
continue
m = config.models.get(func.model)
if m is None:
_logger.warning("Skipping boundary_detector %r: model %r not found", key, func.model)
continue
c = config.credentials.get(m.credentials)
if c is None:
_logger.warning("Skipping boundary_detector %r: credential %r not found", key, m.credentials)
continue
_boundary_detectors[key] = bd_cls(
plugin_id=key,
name=func.name or key.replace("_", " ").title(),
ai_config=_build_ai_cfg(m, c, func),
prompt_text=m.prompt,
auto_queue=func.auto_queue,
rate_limit_seconds=func.rate_limit_seconds,
)
for key, func in config.functions.text_recognizers.items():
m = config.models.get(func.model)
if m is None:
_logger.warning("Skipping text_recognizer %r: model %r not found", key, func.model)
continue
c = config.credentials.get(m.credentials)
if c is None:
_logger.warning("Skipping text_recognizer %r: credential %r not found", key, m.credentials)
continue
_text_recognizers[key] = TRClass(
plugin_id=key,
name=func.name or key.replace("_", " ").title(),
ai_config=_build_ai_cfg(m, c, func),
prompt_text=m.prompt,
auto_queue=func.auto_queue,
rate_limit_seconds=func.rate_limit_seconds,
)
for key, func in config.functions.book_identifiers.items():
m = config.models.get(func.model)
if m is None:
_logger.warning("Skipping book_identifier %r: model %r not found", key, func.model)
continue
c = config.credentials.get(m.credentials)
if c is None:
_logger.warning("Skipping book_identifier %r: credential %r not found", key, m.credentials)
continue
_book_identifiers[key] = BIClass(
plugin_id=key,
name=func.name or key.replace("_", " ").title(),
ai_config=_build_ai_cfg(m, c, func),
prompt_text=m.prompt,
auto_queue=func.auto_queue,
rate_limit_seconds=func.rate_limit_seconds,
)
for key, func in config.functions.archive_searchers.items():
cls = archive_cls.get(func.type)
if cls is None:
_logger.warning("Skipping archive_searcher %r: unknown type %r", key, func.type)
continue
_archive_searchers[key] = cls(
plugin_id=key,
name=func.name or key.replace("_", " ").title(),
rate_limiter=RATE_LIMITER,
rate_limit_seconds=func.rate_limit_seconds,
auto_queue=func.auto_queue,
timeout=func.timeout,
config=func.config,
)
def get_manifest() -> list[PluginManifestEntry]:
"""Return list of plugin descriptors for the frontend."""
result: list[PluginManifestEntry] = []
for pid, p in _boundary_detectors.items():
result.append(
PluginManifestEntry(
id=pid, name=p.name, category="boundary_detector", auto_queue=p.auto_queue, target=p.target
)
)
for pid, p in _text_recognizers.items():
result.append(PluginManifestEntry(id=pid, name=p.name, category="text_recognizer", auto_queue=p.auto_queue))
for pid, p in _book_identifiers.items():
result.append(PluginManifestEntry(id=pid, name=p.name, category="book_identifier", auto_queue=p.auto_queue))
for pid, p in _archive_searchers.items():
result.append(PluginManifestEntry(id=pid, name=p.name, category="archive_searcher", auto_queue=p.auto_queue))
return result
@overload
def get_auto_queue(category: Literal["boundary_detectors", "boundary_detector"]) -> list[BoundaryDetectorPlugin]: ...
@overload
def get_auto_queue(category: Literal["text_recognizers", "text_recognizer"]) -> list[TextRecognizerPlugin]: ...
@overload
def get_auto_queue(category: Literal["book_identifiers", "book_identifier"]) -> list[BookIdentifierPlugin]: ...
@overload
def get_auto_queue(category: Literal["archive_searchers", "archive_searcher"]) -> list[ArchiveSearcherPlugin]: ...
@overload
def get_auto_queue(
category: str,
) -> (
list[BoundaryDetectorPlugin] | list[TextRecognizerPlugin] | list[BookIdentifierPlugin] | list[ArchiveSearcherPlugin]
): ...
def get_auto_queue(
category: str,
) -> (
list[BoundaryDetectorPlugin] | list[TextRecognizerPlugin] | list[BookIdentifierPlugin] | list[ArchiveSearcherPlugin]
):
"""Return plugin instances for a category that have auto_queue=True."""
match category:
case "boundary_detectors" | "boundary_detector":
return [p for p in _boundary_detectors.values() if p.auto_queue]
case "text_recognizers" | "text_recognizer":
return [p for p in _text_recognizers.values() if p.auto_queue]
case "book_identifiers" | "book_identifier":
return [p for p in _book_identifiers.values() if p.auto_queue]
case "archive_searchers" | "archive_searcher":
return [p for p in _archive_searchers.values() if p.auto_queue]
case _:
return []
def get_plugin(plugin_id: str) -> PluginLookupResult:
"""Find a plugin by ID across all categories. Returns a discriminated (category, plugin) tuple."""
if plugin_id in _boundary_detectors:
bd: BDPluginResult = ("boundary_detector", _boundary_detectors[plugin_id])
return bd
if plugin_id in _text_recognizers:
tr: TRPluginResult = ("text_recognizer", _text_recognizers[plugin_id])
return tr
if plugin_id in _book_identifiers:
bi: BIPluginResult = ("book_identifier", _book_identifiers[plugin_id])
return bi
if plugin_id in _archive_searchers:
asr: ASPluginResult = ("archive_searcher", _archive_searchers[plugin_id])
return asr
nf: NotFoundResult = (None, None)
return nf

View File

@@ -0,0 +1,21 @@
"""AI plugin classes using OpenAI-compatible APIs.
Submodules:
_client.py — shared _AIClient + HTTP helpers (private)
boundary_detector_shelves.py — BoundaryDetectorShelvesPlugin (cabinet → shelf bounds)
boundary_detector_books.py — BoundaryDetectorBooksPlugin (shelf → book bounds)
text_recognizer.py — TextRecognizerPlugin (spine image → raw text + fields)
book_identifier.py — BookIdentifierPlugin (raw text → bibliographic metadata)
"""
from .boundary_detector_books import BoundaryDetectorBooksPlugin
from .boundary_detector_shelves import BoundaryDetectorShelvesPlugin
from .book_identifier import BookIdentifierPlugin
from .text_recognizer import TextRecognizerPlugin
__all__ = [
"BoundaryDetectorShelvesPlugin",
"BoundaryDetectorBooksPlugin",
"TextRecognizerPlugin",
"BookIdentifierPlugin",
]

View File

@@ -0,0 +1,94 @@
"""Internal OpenAI-compatible HTTP client shared by all AI plugins.
Caches openai.OpenAI instances per (base_url, api_key) to avoid re-creating on each call.
AIClient wraps the raw API call: fills prompt template, encodes images, parses JSON response.
"""
import json
import re
from string import Template
from typing import Any, cast
import openai
from openai.types.chat import ChatCompletionMessageParam
from openai.types.chat.chat_completion_content_part_image_param import (
ChatCompletionContentPartImageParam,
ImageURL,
)
from openai.types.chat.chat_completion_content_part_text_param import ChatCompletionContentPartTextParam
from models import AIConfig
# Module-level cache of openai.OpenAI instances keyed by (base_url, api_key)
_clients: dict[tuple[str, str], openai.OpenAI] = {}
def _get_client(base_url: str, api_key: str) -> openai.OpenAI:
key = (base_url, api_key)
if key not in _clients:
_clients[key] = openai.OpenAI(base_url=base_url, api_key=api_key)
return _clients[key]
def _parse_json(text: str) -> dict[str, Any]:
"""Extract and parse the first JSON object found in text.
Raises ValueError if no JSON object is found or the JSON is malformed.
"""
text = text.strip()
m = re.search(r"\{.*\}", text, re.DOTALL)
if not m:
raise ValueError(f"No JSON object found in AI response: {text[:200]!r}")
try:
result = json.loads(m.group())
except json.JSONDecodeError as exc:
raise ValueError(f"Failed to parse AI response as JSON: {exc}") from exc
if not isinstance(result, dict):
raise ValueError(f"Expected JSON object, got {type(result).__name__}")
return cast(dict[str, Any], result)
ContentPart = ChatCompletionContentPartImageParam | ChatCompletionContentPartTextParam
class AIClient:
"""AI client bound to a specific provider config and output format.
cfg must contain: base_url, api_key, model, max_image_px, confidence_threshold.
output_format is the hardcoded JSON schema string injected as ${OUTPUT_FORMAT}.
"""
def __init__(self, cfg: AIConfig, output_format: str):
self.cfg = cfg
self.output_format = output_format
def call(
self,
prompt_template: str,
images: list[tuple[str, str]],
text_vars: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Substitute template vars, call API with optional images, return parsed JSON.
images: list of (base64_str, mime_type) tuples.
text_vars: extra ${KEY} substitutions beyond ${OUTPUT_FORMAT}.
"""
vars_: dict[str, str] = {"OUTPUT_FORMAT": self.output_format}
if text_vars:
vars_.update(text_vars)
prompt = Template(prompt_template).safe_substitute(vars_)
client = _get_client(self.cfg["base_url"], self.cfg["api_key"])
parts: list[ContentPart] = [
ChatCompletionContentPartImageParam(
type="image_url",
image_url=ImageURL(url=f"data:{mt};base64,{b64}"),
)
for b64, mt in images
]
parts.append(ChatCompletionContentPartTextParam(type="text", text=prompt))
messages: list[ChatCompletionMessageParam] = [{"role": "user", "content": parts}]
r = client.chat.completions.create(
model=self.cfg["model"], max_tokens=2048, messages=messages, extra_body=self.cfg["extra_body"]
)
raw = r.choices[0].message.content or ""
return _parse_json(raw)

View File

@@ -0,0 +1,56 @@
"""Book identifier plugin — raw spine text → bibliographic metadata.
Input: raw_text string (from text_recognizer).
Output: {"title": "...", "author": "...", "year": "...", "isbn": "...",
"publisher": "...", "confidence": 0.95}
confidence — float 0-1; results below confidence_threshold are discarded by logic.py.
Result added to books.candidates and books.ai_* fields.
"""
from models import AIConfig, AIIdentifyResult
from ._client import AIClient
class BookIdentifierPlugin:
"""Identifies a book from spine text using a VLM with web-search capability."""
category = "book_identifiers"
OUTPUT_FORMAT = (
'{"title": "...", "author": "...", "year": "...", ' '"isbn": "...", "publisher": "...", "confidence": 0.95}'
)
def __init__(
self,
plugin_id: str,
name: str,
ai_config: AIConfig,
prompt_text: str,
auto_queue: bool,
rate_limit_seconds: float,
):
self.plugin_id = plugin_id
self.name = name
self.auto_queue = auto_queue
self.rate_limit_seconds = rate_limit_seconds
self._client = AIClient(ai_config, self.OUTPUT_FORMAT)
self._prompt_text = prompt_text
def identify(self, raw_text: str) -> AIIdentifyResult:
"""Returns AIIdentifyResult with title/author/year/isbn/publisher/confidence."""
raw = self._client.call(self._prompt_text, [], text_vars={"RAW_TEXT": raw_text})
result = AIIdentifyResult(
title=str(raw.get("title") or ""),
author=str(raw.get("author") or ""),
year=str(raw.get("year") or ""),
isbn=str(raw.get("isbn") or ""),
publisher=str(raw.get("publisher") or ""),
)
conf = raw.get("confidence")
if conf is not None:
result["confidence"] = float(conf)
return result
@property
def confidence_threshold(self) -> float:
return self._client.cfg["confidence_threshold"]

View File

@@ -0,0 +1,46 @@
"""Boundary detector plugin for book spine detection.
Input: shelf image (full or cropped from cabinet photo).
Output: {"boundaries": [x0, x1, ...]}
boundaries — interior x-fractions (0=left, 1=right), excluding 0 and 1.
Results stored in shelves.ai_book_boundaries[plugin_id].
"""
from models import AIConfig, BoundaryDetectResult
from ._client import AIClient
class BoundaryDetectorBooksPlugin:
"""Detects vertical book-spine boundaries in a shelf image using a VLM."""
category = "boundary_detectors"
target = "books" # operates on shelf images; stored in ai_book_boundaries
OUTPUT_FORMAT = '{"boundaries": [0.08, 0.16, 0.24, 0.32]}'
def __init__(
self,
plugin_id: str,
name: str,
ai_config: AIConfig,
prompt_text: str,
auto_queue: bool,
rate_limit_seconds: float,
):
self.plugin_id = plugin_id
self.name = name
self.auto_queue = auto_queue
self.rate_limit_seconds = rate_limit_seconds
self._client = AIClient(ai_config, self.OUTPUT_FORMAT)
self._prompt_text = prompt_text
def detect(self, image_b64: str, image_mime: str) -> BoundaryDetectResult:
"""Returns BoundaryDetectResult with 'boundaries' (list[float])."""
raw = self._client.call(self._prompt_text, [(image_b64, image_mime)])
raw_bounds: list[object] = raw.get("boundaries") or []
boundaries: list[float] = [float(b) for b in raw_bounds if isinstance(b, (int, float))]
return BoundaryDetectResult(boundaries=boundaries)
@property
def max_image_px(self) -> int:
return self._client.cfg["max_image_px"]

View File

@@ -0,0 +1,51 @@
"""Boundary detector plugin for shelf detection.
Input: cabinet photo (full image).
Output: {"boundaries": [y0, y1, ...], "confidence": 0.x}
boundaries — interior y-fractions (0=top, 1=bottom), excluding 0 and 1.
confidence — optional float 0-1.
Results stored in cabinets.ai_shelf_boundaries[plugin_id].
"""
from models import AIConfig, BoundaryDetectResult
from ._client import AIClient
class BoundaryDetectorShelvesPlugin:
"""Detects horizontal shelf boundaries in a cabinet photo using a VLM."""
category = "boundary_detectors"
target = "shelves" # operates on cabinet images; stored in ai_shelf_boundaries
OUTPUT_FORMAT = '{"boundaries": [0.24, 0.48, 0.72], "confidence": 0.92}'
def __init__(
self,
plugin_id: str,
name: str,
ai_config: AIConfig,
prompt_text: str,
auto_queue: bool,
rate_limit_seconds: float,
):
self.plugin_id = plugin_id
self.name = name
self.auto_queue = auto_queue
self.rate_limit_seconds = rate_limit_seconds
self._client = AIClient(ai_config, self.OUTPUT_FORMAT)
self._prompt_text = prompt_text
def detect(self, image_b64: str, image_mime: str) -> BoundaryDetectResult:
"""Returns BoundaryDetectResult with 'boundaries' and optionally 'confidence'."""
raw = self._client.call(self._prompt_text, [(image_b64, image_mime)])
raw_bounds: list[object] = raw.get("boundaries") or []
boundaries: list[float] = [float(b) for b in raw_bounds if isinstance(b, (int, float))]
result = BoundaryDetectResult(boundaries=boundaries)
conf = raw.get("confidence")
if conf is not None:
result["confidence"] = float(conf)
return result
@property
def max_image_px(self) -> int:
return self._client.cfg["max_image_px"]

View File

@@ -0,0 +1,56 @@
"""Text recognizer plugin — spine image → raw text + structured fields.
Input: book spine image.
Output: {"raw_text": "...", "title": "...", "author": "...", "year": "...",
"publisher": "...", "other": "..."}
raw_text — all visible text verbatim, line-break separated.
other fields — VLM interpretation of raw_text.
Result added to books.candidates and books.raw_text.
"""
from models import AIConfig, TextRecognizeResult
from ._client import AIClient
class TextRecognizerPlugin:
"""Reads text from a book spine image using a VLM."""
category = "text_recognizers"
OUTPUT_FORMAT = (
'{"raw_text": "The Great Gatsby\\nF. Scott Fitzgerald\\nScribner", '
'"title": "The Great Gatsby", "author": "F. Scott Fitzgerald", '
'"year": "", "publisher": "Scribner", "other": ""}'
)
def __init__(
self,
plugin_id: str,
name: str,
ai_config: AIConfig,
prompt_text: str,
auto_queue: bool,
rate_limit_seconds: float,
):
self.plugin_id = plugin_id
self.name = name
self.auto_queue = auto_queue
self.rate_limit_seconds = rate_limit_seconds
self._client = AIClient(ai_config, self.OUTPUT_FORMAT)
self._prompt_text = prompt_text
def recognize(self, image_b64: str, image_mime: str) -> TextRecognizeResult:
"""Returns TextRecognizeResult with raw_text, title, author, year, publisher, other."""
raw = self._client.call(self._prompt_text, [(image_b64, image_mime)])
return TextRecognizeResult(
raw_text=str(raw.get("raw_text") or ""),
title=str(raw.get("title") or ""),
author=str(raw.get("author") or ""),
year=str(raw.get("year") or ""),
publisher=str(raw.get("publisher") or ""),
other=str(raw.get("other") or ""),
)
@property
def max_image_px(self) -> int:
return self._client.cfg["max_image_px"]

View File

View File

@@ -0,0 +1,121 @@
"""Config-driven HTML scraper for archive sites (rusneb, alib, shpl, etc.)."""
import re
from typing import Any
from urllib.parse import urlparse
import httpx
from models import CandidateRecord
from ..rate_limiter import RateLimiter
_YEAR_RE = re.compile(r"\b(1[0-9]{3}|20[012][0-9])\b")
def _cls_re(cls_frag: str, min_len: int = 3, max_len: int = 120) -> re.Pattern[str]:
return re.compile(rf'class="[^"]*{re.escape(cls_frag)}[^"]*"[^>]*>([^<]{{{min_len},{max_len}}})<')
class HtmlScraperPlugin:
"""
Config-driven HTML scraper. Supported config keys:
url — search URL
search_param — query param name
extra_params — dict of fixed extra query parameters
title_class — CSS class fragment for title elements (class-based strategy)
author_class — CSS class fragment for author elements
link_href_pattern — href regex to find title <a> links (link strategy, e.g. alib)
brief_class — CSS class for brief record rows (brief strategy, e.g. shpl)
"""
category = "archive_searchers"
def __init__(
self,
plugin_id: str,
name: str,
rate_limiter: RateLimiter,
rate_limit_seconds: float,
auto_queue: bool,
timeout: int,
config: dict[str, Any],
):
self.plugin_id = plugin_id
self.name = name
self._rl = rate_limiter
self.rate_limit_seconds = rate_limit_seconds
self.auto_queue = auto_queue
self.timeout = timeout
self.config = config
self._domain: str = urlparse(str(config.get("url") or "")).netloc or plugin_id
def search(self, query: str) -> list[CandidateRecord]:
cfg = self.config
self._rl.wait_and_record(self._domain, self.rate_limit_seconds)
params: dict[str, Any] = dict(cfg.get("extra_params") or {})
params[cfg["search_param"]] = query
r = httpx.get(
cfg["url"],
params=params,
timeout=self.timeout,
headers={"User-Agent": "Mozilla/5.0"},
)
html = r.text
years = _YEAR_RE.findall(html)
# Strategy: link_href_pattern (alib-style)
if "link_href_pattern" in cfg:
return self._parse_link(html, years, cfg)
# Strategy: brief_class (shpl-style)
if "brief_class" in cfg:
return self._parse_brief(html, years, cfg)
# Strategy: title_class + author_class (rusneb-style)
return self._parse_class(html, years, cfg)
def _parse_class(self, html: str, years: list[str], cfg: dict[str, Any]) -> list[CandidateRecord]:
titles = _cls_re(cfg.get("title_class", "title")).findall(html)[:3]
authors = _cls_re(cfg.get("author_class", "author"), 3, 80).findall(html)[:3]
return [
CandidateRecord(
source=self.plugin_id,
title=title.strip(),
author=authors[i].strip() if i < len(authors) else "",
year=years[i] if i < len(years) else "",
isbn="",
publisher="",
)
for i, title in enumerate(titles)
]
def _parse_link(self, html: str, years: list[str], cfg: dict[str, Any]) -> list[CandidateRecord]:
href_pat = cfg.get("link_href_pattern", r"")
titles = re.findall(rf'<a[^>]+href="[^"]*{href_pat}[^"]*"[^>]*>([^<]{{3,120}})</a>', html)[:3]
authors = _cls_re(cfg.get("author_class", "author"), 3, 80).findall(html)[:3]
return [
CandidateRecord(
source=self.plugin_id,
title=title.strip(),
author=authors[i].strip() if i < len(authors) else "",
year=years[i] if i < len(years) else "",
isbn="",
publisher="",
)
for i, title in enumerate(titles)
]
def _parse_brief(self, html: str, years: list[str], cfg: dict[str, Any]) -> list[CandidateRecord]:
titles = _cls_re(cfg.get("brief_class", "brief"), 3, 120).findall(html)[:3]
return [
CandidateRecord(
source=self.plugin_id,
title=t.strip(),
author="",
year=years[i] if i < len(years) else "",
isbn="",
publisher="",
)
for i, t in enumerate(titles)
]

View File

@@ -0,0 +1,54 @@
"""OpenLibrary JSON search API plugin (openlibrary.org/search.json)."""
from typing import Any
import httpx
from models import CandidateRecord
from ..rate_limiter import RateLimiter
_DOMAIN = "openlibrary.org"
class OpenLibraryPlugin:
category = "archive_searchers"
def __init__(
self,
plugin_id: str,
name: str,
rate_limiter: RateLimiter,
rate_limit_seconds: float,
auto_queue: bool,
timeout: int,
config: dict[str, Any],
):
self.plugin_id = plugin_id
self.name = name
self._rl = rate_limiter
self.rate_limit_seconds = rate_limit_seconds
self.auto_queue = auto_queue
self.timeout = timeout
def search(self, query: str) -> list[CandidateRecord]:
self._rl.wait_and_record(_DOMAIN, self.rate_limit_seconds)
r = httpx.get(
"https://openlibrary.org/search.json",
params={"q": query, "limit": 5, "fields": "title,author_name,first_publish_year,isbn,publisher"},
timeout=self.timeout,
)
docs: list[dict[str, Any]] = r.json().get("docs", [])
out: list[CandidateRecord] = []
for d in docs[:3]:
out.append(
CandidateRecord(
source=self.plugin_id,
title=(str(d.get("title") or "")).strip(),
author=", ".join(d.get("author_name") or []).strip(),
year=str(d.get("first_publish_year") or "").strip(),
isbn=((d.get("isbn") or [""])[0]).strip(),
publisher=((d.get("publisher") or [""])[0]).strip(),
)
)
return out

View File

@@ -0,0 +1,59 @@
"""RSL (Russian State Library) AJAX JSON search API plugin (search.rsl.ru)."""
from typing import Any
import httpx
from models import CandidateRecord
from ..rate_limiter import RateLimiter
_DOMAIN = "search.rsl.ru"
class RSLPlugin:
category = "archive_searchers"
def __init__(
self,
plugin_id: str,
name: str,
rate_limiter: RateLimiter,
rate_limit_seconds: float,
auto_queue: bool,
timeout: int,
config: dict[str, Any],
):
self.plugin_id = plugin_id
self.name = name
self._rl = rate_limiter
self.rate_limit_seconds = rate_limit_seconds
self.auto_queue = auto_queue
self.timeout = timeout
def search(self, query: str) -> list[CandidateRecord]:
self._rl.wait_and_record(_DOMAIN, self.rate_limit_seconds)
r = httpx.get(
"https://search.rsl.ru/site/ajax-search",
params={"language": "ru", "q": query, "page": 1, "perPage": 5},
timeout=self.timeout,
headers={"Accept": "application/json"},
)
data: dict[str, Any] = r.json()
records: list[dict[str, Any]] = data.get("records") or data.get("items") or data.get("data") or []
out: list[CandidateRecord] = []
for rec in records[:3]:
title = (str(rec.get("title") or rec.get("name") or "")).strip()
if not title:
continue
out.append(
CandidateRecord(
source=self.plugin_id,
title=title,
author=(str(rec.get("author") or rec.get("authors") or "")).strip(),
year=str(rec.get("year") or rec.get("pubyear") or "").strip(),
isbn=(str(rec.get("isbn") or "")).strip(),
publisher=(str(rec.get("publisher") or "")).strip(),
)
)
return out

View File

@@ -0,0 +1,71 @@
"""SRU XML catalog plugin (NLR and similar SRU-compliant catalogs)."""
import re
from typing import Any
from urllib.parse import urlparse
import httpx
from models import CandidateRecord
from ..rate_limiter import RateLimiter
class SRUCatalogPlugin:
"""
Config-driven SRU catalog searcher. Config keys:
url — SRU endpoint URL
query_prefix — SRU query prefix prepended to search term (e.g. 'title=')
"""
category = "archive_searchers"
def __init__(
self,
plugin_id: str,
name: str,
rate_limiter: RateLimiter,
rate_limit_seconds: float,
auto_queue: bool,
timeout: int,
config: dict[str, Any],
):
self.plugin_id = plugin_id
self.name = name
self._rl = rate_limiter
self.rate_limit_seconds = rate_limit_seconds
self.auto_queue = auto_queue
self.timeout = timeout
self.config = config
self._domain: str = urlparse(str(config.get("url") or "")).netloc or plugin_id
def search(self, query: str) -> list[CandidateRecord]:
cfg = self.config
self._rl.wait_and_record(self._domain, self.rate_limit_seconds)
sru_query = f'{cfg.get("query_prefix", "")}{query}'
r = httpx.get(
cfg["url"],
params={
"operation": "searchRetrieve",
"version": "1.1",
"query": sru_query,
"maximumRecords": "5",
"recordSchema": "dc",
},
timeout=self.timeout,
headers={"User-Agent": "Mozilla/5.0"},
)
titles = re.findall(r"<dc:title>([^<]+)</dc:title>", r.text)[:3]
authors = re.findall(r"<dc:creator>([^<]+)</dc:creator>", r.text)[:3]
years = re.findall(r"<dc:date>(\d{4})</dc:date>", r.text)[:3]
return [
CandidateRecord(
source=self.plugin_id,
title=title.strip(),
author=authors[i].strip() if i < len(authors) else "",
year=years[i] if i < len(years) else "",
isbn="",
publisher="",
)
for i, title in enumerate(titles)
]

View File

@@ -0,0 +1,23 @@
"""Thread-safe in-memory per-domain rate limiter shared across all archive plugin threads."""
import time
from threading import Lock
class RateLimiter:
"""Thread-safe per-domain rate limiter. Shared across all archive plugin threads."""
def __init__(self):
self._lock = Lock()
self._next: dict[str, float] = {}
def wait_and_record(self, domain: str, rate_s: float):
"""Block until rate limit for domain has passed, then record next allowed time."""
if rate_s <= 0:
return
with self._lock:
now = time.time()
delay = self._next.get(domain, 0) - now
self._next[domain] = max(now, self._next.get(domain, now)) + rate_s
if delay > 0:
time.sleep(delay)