""" Database layer: schema, connection/transaction lifecycle, and all query functions. No file I/O, no config, no business logic. All SQL lives here. """ import json import sqlite3 import time import uuid from collections.abc import Iterator from contextlib import contextmanager from datetime import datetime from pathlib import Path from typing import Any from mashumaro.codecs import BasicDecoder from models import BookRow, CabinetRow, RoomRow, ShelfRow DB_PATH = Path("data") / "books.db" # ── Schema ───────────────────────────────────────────────────────────────────── SCHEMA = """ CREATE TABLE IF NOT EXISTS rooms ( id TEXT PRIMARY KEY, name TEXT NOT NULL, position INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS cabinets ( id TEXT PRIMARY KEY, room_id TEXT NOT NULL REFERENCES rooms(id) ON DELETE CASCADE, name TEXT NOT NULL, photo_filename TEXT, shelf_boundaries TEXT DEFAULT NULL, ai_shelf_boundaries TEXT DEFAULT NULL, position INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS shelves ( id TEXT PRIMARY KEY, cabinet_id TEXT NOT NULL REFERENCES cabinets(id) ON DELETE CASCADE, name TEXT NOT NULL, photo_filename TEXT, book_boundaries TEXT DEFAULT NULL, ai_book_boundaries TEXT DEFAULT NULL, position INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS books ( id TEXT PRIMARY KEY, shelf_id TEXT NOT NULL REFERENCES shelves(id) ON DELETE CASCADE, position INTEGER NOT NULL DEFAULT 0, image_filename TEXT, title TEXT DEFAULT '', author TEXT DEFAULT '', year TEXT DEFAULT '', isbn TEXT DEFAULT '', publisher TEXT DEFAULT '', notes TEXT DEFAULT '', raw_text TEXT DEFAULT '', ai_title TEXT DEFAULT '', ai_author TEXT DEFAULT '', ai_year TEXT DEFAULT '', ai_isbn TEXT DEFAULT '', ai_publisher TEXT DEFAULT '', identification_status TEXT DEFAULT 'unidentified', title_confidence REAL DEFAULT 0, analyzed_at TEXT, created_at TEXT NOT NULL, candidates TEXT DEFAULT NULL, ai_blocks TEXT DEFAULT NULL ); CREATE TABLE IF NOT EXISTS ai_log ( id TEXT PRIMARY KEY, ts REAL NOT NULL, plugin_id TEXT NOT NULL, entity_type TEXT NOT NULL, entity_id TEXT NOT NULL, model TEXT NOT NULL, request TEXT NOT NULL DEFAULT '', status TEXT NOT NULL DEFAULT 'running', response TEXT NOT NULL DEFAULT '', duration_ms INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS batch_queue ( book_id TEXT PRIMARY KEY, added_at REAL NOT NULL ); """ # ── Mashumaro decoders for entity rows ──────────────────────────────────────── _room_dec: BasicDecoder[RoomRow] = BasicDecoder(RoomRow) _cabinet_dec: BasicDecoder[CabinetRow] = BasicDecoder(CabinetRow) _shelf_dec: BasicDecoder[ShelfRow] = BasicDecoder(ShelfRow) _book_dec: BasicDecoder[BookRow] = BasicDecoder(BookRow) def _room(row: sqlite3.Row) -> RoomRow: return _room_dec.decode(dict(row)) def _cabinet(row: sqlite3.Row) -> CabinetRow: return _cabinet_dec.decode(dict(row)) def _shelf(row: sqlite3.Row) -> ShelfRow: return _shelf_dec.decode(dict(row)) def _book(row: sqlite3.Row) -> BookRow: return _book_dec.decode(dict(row)) # ── DB init + connection ──────────────────────────────────────────────────────── def init_db() -> None: DB_PATH.parent.mkdir(exist_ok=True) c = conn() c.executescript(SCHEMA) c.commit() c.close() def conn() -> sqlite3.Connection: c = sqlite3.connect(DB_PATH) c.row_factory = sqlite3.Row c.execute("PRAGMA foreign_keys = ON") return c # ── Context managers ────────────────────────────────────────────────────────── @contextmanager def connection() -> Iterator[sqlite3.Connection]: """Read-only context: opens a connection, closes on exit.""" c = conn() try: yield c finally: c.close() @contextmanager def transaction() -> Iterator[sqlite3.Connection]: """Write context: opens, commits on success, rolls back on exception.""" c = conn() try: yield c c.commit() except Exception: c.rollback() raise finally: c.close() # ── Helpers ─────────────────────────────────────────────────────────────────── COUNTERS: dict[str, int] = {} def uid() -> str: return str(uuid.uuid4()) def now() -> str: return datetime.now().isoformat() def next_pos(db: sqlite3.Connection, table: str, parent_col: str, parent_id: str) -> int: row = db.execute(f"SELECT COALESCE(MAX(position),0)+1 FROM {table} WHERE {parent_col}=?", [parent_id]).fetchone() return int(row[0]) def next_root_pos(db: sqlite3.Connection, table: str) -> int: row = db.execute(f"SELECT COALESCE(MAX(position),0)+1 FROM {table}").fetchone() return int(row[0]) def next_name(prefix: str) -> str: COUNTERS[prefix] = COUNTERS.get(prefix, 0) + 1 return f"{prefix} {COUNTERS[prefix]}" # ── Tree ────────────────────────────────────────────────────────────────────── def get_tree(db: sqlite3.Connection) -> list[dict[str, object]]: """Build and return the full nested Room→Cabinet→Shelf→Book tree.""" rooms: list[dict[str, object]] = [dict(r) for r in db.execute("SELECT * FROM rooms ORDER BY position")] for room in rooms: cabs: list[dict[str, object]] = [ dict(c) for c in db.execute("SELECT * FROM cabinets WHERE room_id=? ORDER BY position", [room["id"]]) ] for cab in cabs: shelves: list[dict[str, object]] = [ dict(s) for s in db.execute("SELECT * FROM shelves WHERE cabinet_id=? ORDER BY position", [cab["id"]]) ] for shelf in shelves: shelf["books"] = [ dict(b) for b in db.execute("SELECT * FROM books WHERE shelf_id=? ORDER BY position", [shelf["id"]]) ] cab["shelves"] = shelves room["cabinets"] = cabs return rooms # ── Rooms ───────────────────────────────────────────────────────────────────── def get_room(db: sqlite3.Connection, room_id: str) -> RoomRow | None: row = db.execute("SELECT * FROM rooms WHERE id=?", [room_id]).fetchone() return _room(row) if row else None def create_room(db: sqlite3.Connection) -> RoomRow: data = {"id": uid(), "name": next_name("Room"), "position": next_root_pos(db, "rooms"), "created_at": now()} db.execute("INSERT INTO rooms VALUES(:id,:name,:position,:created_at)", data) return _room_dec.decode(data) def rename_room(db: sqlite3.Connection, room_id: str, name: str) -> None: db.execute("UPDATE rooms SET name=? WHERE id=?", [name, room_id]) def collect_room_photos(db: sqlite3.Connection, room_id: str) -> list[str]: """Return all photo filenames for cabinets/shelves/books under this room.""" photos: list[str] = [] for r in db.execute( "SELECT image_filename FROM books WHERE shelf_id IN " "(SELECT id FROM shelves WHERE cabinet_id IN (SELECT id FROM cabinets WHERE room_id=?))", [room_id], ): if r[0]: photos.append(str(r[0])) for r in db.execute( "SELECT photo_filename FROM shelves WHERE cabinet_id IN (SELECT id FROM cabinets WHERE room_id=?)", [room_id] ): if r[0]: photos.append(str(r[0])) for r in db.execute("SELECT photo_filename FROM cabinets WHERE room_id=?", [room_id]): if r[0]: photos.append(str(r[0])) return photos def delete_room(db: sqlite3.Connection, room_id: str) -> None: """Delete room; SQLite ON DELETE CASCADE removes all children.""" db.execute("DELETE FROM rooms WHERE id=?", [room_id]) # ── Cabinets ────────────────────────────────────────────────────────────────── def get_cabinet(db: sqlite3.Connection, cabinet_id: str) -> CabinetRow | None: row = db.execute("SELECT * FROM cabinets WHERE id=?", [cabinet_id]).fetchone() return _cabinet(row) if row else None def create_cabinet(db: sqlite3.Connection, room_id: str) -> CabinetRow: data: dict[str, object] = { "id": uid(), "room_id": room_id, "name": next_name("Cabinet"), "photo_filename": None, "shelf_boundaries": None, "ai_shelf_boundaries": None, "position": next_pos(db, "cabinets", "room_id", room_id), "created_at": now(), } db.execute( "INSERT INTO cabinets VALUES(" ":id,:room_id,:name,:photo_filename,:shelf_boundaries," ":ai_shelf_boundaries,:position,:created_at)", data, ) return _cabinet_dec.decode(data) def rename_cabinet(db: sqlite3.Connection, cabinet_id: str, name: str) -> None: db.execute("UPDATE cabinets SET name=? WHERE id=?", [name, cabinet_id]) def collect_cabinet_photos(db: sqlite3.Connection, cabinet_id: str) -> list[str]: photos: list[str] = [] for r in db.execute( "SELECT image_filename FROM books WHERE shelf_id IN (SELECT id FROM shelves WHERE cabinet_id=?)", [cabinet_id] ): if r[0]: photos.append(str(r[0])) for r in db.execute("SELECT photo_filename FROM shelves WHERE cabinet_id=?", [cabinet_id]): if r[0]: photos.append(str(r[0])) row = db.execute("SELECT photo_filename FROM cabinets WHERE id=?", [cabinet_id]).fetchone() if row and row[0]: photos.append(str(row[0])) return photos def delete_cabinet(db: sqlite3.Connection, cabinet_id: str) -> None: db.execute("DELETE FROM cabinets WHERE id=?", [cabinet_id]) def get_cabinet_photo(db: sqlite3.Connection, cabinet_id: str) -> str | None: row = db.execute("SELECT photo_filename FROM cabinets WHERE id=?", [cabinet_id]).fetchone() return str(row[0]) if row and row[0] else None def set_cabinet_photo(db: sqlite3.Connection, cabinet_id: str, filename: str) -> None: db.execute("UPDATE cabinets SET photo_filename=? WHERE id=?", [filename, cabinet_id]) def set_cabinet_boundaries(db: sqlite3.Connection, cabinet_id: str, boundaries_json: str) -> None: db.execute("UPDATE cabinets SET shelf_boundaries=? WHERE id=?", [boundaries_json, cabinet_id]) def set_ai_shelf_boundaries(db: sqlite3.Connection, cabinet_id: str, plugin_id: str, boundaries: list[float]) -> None: row = db.execute("SELECT ai_shelf_boundaries FROM cabinets WHERE id=?", [cabinet_id]).fetchone() current: dict[str, object] = json.loads(row[0]) if row and row[0] else {} current[plugin_id] = boundaries db.execute("UPDATE cabinets SET ai_shelf_boundaries=? WHERE id=?", [json.dumps(current), cabinet_id]) # ── Shelves ─────────────────────────────────────────────────────────────────── def get_shelf(db: sqlite3.Connection, shelf_id: str) -> ShelfRow | None: row = db.execute("SELECT * FROM shelves WHERE id=?", [shelf_id]).fetchone() return _shelf(row) if row else None def create_shelf(db: sqlite3.Connection, cabinet_id: str) -> ShelfRow: data: dict[str, object] = { "id": uid(), "cabinet_id": cabinet_id, "name": next_name("Shelf"), "photo_filename": None, "book_boundaries": None, "ai_book_boundaries": None, "position": next_pos(db, "shelves", "cabinet_id", cabinet_id), "created_at": now(), } db.execute( "INSERT INTO shelves VALUES(" ":id,:cabinet_id,:name,:photo_filename,:book_boundaries,:ai_book_boundaries,:position,:created_at)", data, ) return _shelf_dec.decode(data) def rename_shelf(db: sqlite3.Connection, shelf_id: str, name: str) -> None: db.execute("UPDATE shelves SET name=? WHERE id=?", [name, shelf_id]) def collect_shelf_photos(db: sqlite3.Connection, shelf_id: str) -> list[str]: photos: list[str] = [] row = db.execute("SELECT photo_filename FROM shelves WHERE id=?", [shelf_id]).fetchone() if row and row[0]: photos.append(str(row[0])) for r in db.execute("SELECT image_filename FROM books WHERE shelf_id=?", [shelf_id]): if r[0]: photos.append(str(r[0])) return photos def delete_shelf(db: sqlite3.Connection, shelf_id: str) -> None: db.execute("DELETE FROM shelves WHERE id=?", [shelf_id]) def get_shelf_photo(db: sqlite3.Connection, shelf_id: str) -> str | None: row = db.execute("SELECT photo_filename FROM shelves WHERE id=?", [shelf_id]).fetchone() return str(row[0]) if row and row[0] else None def set_shelf_photo(db: sqlite3.Connection, shelf_id: str, filename: str) -> None: db.execute("UPDATE shelves SET photo_filename=? WHERE id=?", [filename, shelf_id]) def set_shelf_boundaries(db: sqlite3.Connection, shelf_id: str, boundaries_json: str) -> None: db.execute("UPDATE shelves SET book_boundaries=? WHERE id=?", [boundaries_json, shelf_id]) def set_ai_book_boundaries(db: sqlite3.Connection, shelf_id: str, plugin_id: str, boundaries: list[float]) -> None: row = db.execute("SELECT ai_book_boundaries FROM shelves WHERE id=?", [shelf_id]).fetchone() current: dict[str, object] = json.loads(row[0]) if row and row[0] else {} current[plugin_id] = boundaries db.execute("UPDATE shelves SET ai_book_boundaries=? WHERE id=?", [json.dumps(current), shelf_id]) def get_shelf_rank(db: sqlite3.Connection, shelf_id: str) -> int: """0-based rank of shelf among its siblings sorted by position.""" row = db.execute("SELECT cabinet_id FROM shelves WHERE id=?", [shelf_id]).fetchone() if not row: return 0 siblings = [r[0] for r in db.execute("SELECT id FROM shelves WHERE cabinet_id=? ORDER BY position", [row[0]])] return siblings.index(shelf_id) if shelf_id in siblings else 0 # ── Books ───────────────────────────────────────────────────────────────────── def get_book(db: sqlite3.Connection, book_id: str) -> BookRow | None: row = db.execute("SELECT * FROM books WHERE id=?", [book_id]).fetchone() return _book(row) if row else None def create_book(db: sqlite3.Connection, shelf_id: str) -> BookRow: data: dict[str, object] = { "id": uid(), "shelf_id": shelf_id, "position": next_pos(db, "books", "shelf_id", shelf_id), "image_filename": None, "title": "", "author": "", "year": "", "isbn": "", "publisher": "", "notes": "", "raw_text": "", "ai_title": "", "ai_author": "", "ai_year": "", "ai_isbn": "", "ai_publisher": "", "identification_status": "unidentified", "title_confidence": 0, "analyzed_at": None, "created_at": now(), "candidates": None, "ai_blocks": None, } db.execute( "INSERT INTO books VALUES(:id,:shelf_id,:position,:image_filename,:title,:author,:year,:isbn,:publisher," ":notes,:raw_text,:ai_title,:ai_author,:ai_year,:ai_isbn,:ai_publisher,:identification_status," ":title_confidence,:analyzed_at,:created_at,:candidates,:ai_blocks)", data, ) return _book_dec.decode(data) def delete_book(db: sqlite3.Connection, book_id: str) -> None: db.execute("DELETE FROM books WHERE id=?", [book_id]) def get_book_photo(db: sqlite3.Connection, book_id: str) -> str | None: row = db.execute("SELECT image_filename FROM books WHERE id=?", [book_id]).fetchone() return str(row[0]) if row and row[0] else None def set_book_photo(db: sqlite3.Connection, book_id: str, filename: str) -> None: db.execute("UPDATE books SET image_filename=? WHERE id=?", [filename, book_id]) def set_user_book_fields( db: sqlite3.Connection, book_id: str, title: str, author: str, year: str, isbn: str, publisher: str, notes: str, ) -> None: """Set both user fields and ai_* fields (user edit is the authoritative identification).""" db.execute( "UPDATE books SET title=?,author=?,year=?,isbn=?,publisher=?,notes=?," "ai_title=?,ai_author=?,ai_year=?,ai_isbn=?,ai_publisher=? WHERE id=?", [title, author, year, isbn, publisher, notes, title, author, year, isbn, publisher, book_id], ) def set_book_status(db: sqlite3.Connection, book_id: str, status: str) -> None: db.execute("UPDATE books SET identification_status=? WHERE id=?", [status, book_id]) def set_book_confidence(db: sqlite3.Connection, book_id: str, confidence: float, analyzed_at: str) -> None: db.execute( "UPDATE books SET title_confidence=?, analyzed_at=? WHERE id=?", [confidence, analyzed_at, book_id], ) def set_book_ai_fields( db: sqlite3.Connection, book_id: str, ai_title: str, ai_author: str, ai_year: str, ai_isbn: str, ai_publisher: str, ) -> None: db.execute( "UPDATE books SET ai_title=?,ai_author=?,ai_year=?,ai_isbn=?,ai_publisher=? WHERE id=?", [ai_title, ai_author, ai_year, ai_isbn, ai_publisher, book_id], ) def set_book_ai_field(db: sqlite3.Connection, book_id: str, field: str, value: str) -> None: """Set a single ai_* field by name (used in dismiss_field logic).""" # field is validated by caller to be in AI_FIELDS db.execute(f"UPDATE books SET ai_{field}=? WHERE id=?", [value, book_id]) def set_book_raw_text(db: sqlite3.Connection, book_id: str, raw_text: str) -> None: db.execute("UPDATE books SET raw_text=? WHERE id=?", [raw_text, book_id]) def set_book_candidates(db: sqlite3.Connection, book_id: str, candidates_json: str) -> None: db.execute("UPDATE books SET candidates=? WHERE id=?", [candidates_json, book_id]) def set_book_ai_blocks(db: sqlite3.Connection, book_id: str, ai_blocks_json: str) -> None: db.execute("UPDATE books SET ai_blocks=? WHERE id=?", [ai_blocks_json, book_id]) def get_book_rank(db: sqlite3.Connection, book_id: str) -> int: """0-based rank of book among its siblings sorted by position.""" row = db.execute("SELECT shelf_id FROM books WHERE id=?", [book_id]).fetchone() if not row: return 0 siblings = [r[0] for r in db.execute("SELECT id FROM books WHERE shelf_id=? ORDER BY position", [row[0]])] return siblings.index(book_id) if book_id in siblings else 0 def get_unidentified_book_ids(db: sqlite3.Connection) -> list[str]: return [str(r[0]) for r in db.execute("SELECT id FROM books WHERE identification_status='unidentified'")] # ── Reorder ─────────────────────────────────────────────────────────────────── def reorder_entities(db: sqlite3.Connection, table: str, ids: list[str]) -> None: for i, entity_id in enumerate(ids, 1): db.execute(f"UPDATE {table} SET position=? WHERE id=?", [i, entity_id]) # ── AI log ──────────────────────────────────────────────────────────────────── def insert_ai_log_entry( db: sqlite3.Connection, entry_id: str, ts: float, plugin_id: str, entity_type: str, entity_id: str, model: str, request: str, ) -> None: """Insert a new AI log entry with status='running'.""" db.execute( "INSERT OR IGNORE INTO ai_log" " (id, ts, plugin_id, entity_type, entity_id, model, request) VALUES (?,?,?,?,?,?,?)", [entry_id, ts, plugin_id, entity_type, entity_id, model, request], ) def update_ai_log_entry(db: sqlite3.Connection, entry_id: str, status: str, response: str, duration_ms: int) -> None: """Update an AI log entry with the final status and response.""" db.execute( "UPDATE ai_log SET status=?, response=?, duration_ms=? WHERE id=?", [status, response, duration_ms, entry_id], ) def get_ai_log_entries(db: sqlite3.Connection, limit: int) -> list[dict[str, Any]]: """Return the most recent AI log entries, oldest first.""" rows = db.execute( "SELECT id, ts, plugin_id, entity_type, entity_id, model, request, status, response, duration_ms" " FROM ai_log ORDER BY ts DESC LIMIT ?", [limit], ).fetchall() return [dict(r) for r in reversed(rows)] # ── Batch queue ──────────────────────────────────────────────────────────────── def add_to_batch_queue(db: sqlite3.Connection, book_ids: list[str]) -> None: """Insert book IDs into the batch queue, ignoring duplicates. Args: db: Open database connection (must be writable). book_ids: Book IDs to enqueue. """ ts = time.time() db.executemany( "INSERT OR IGNORE INTO batch_queue (book_id, added_at) VALUES (?,?)", [(bid, ts) for bid in book_ids] ) def remove_from_batch_queue(db: sqlite3.Connection, book_id: str) -> None: """Remove a single book ID from the batch queue. Args: db: Open database connection (must be writable). book_id: Book ID to dequeue. """ db.execute("DELETE FROM batch_queue WHERE book_id=?", [book_id]) def get_batch_queue(db: sqlite3.Connection) -> list[str]: """Return all queued book IDs ordered by insertion time (oldest first). Args: db: Open database connection. Returns: List of book ID strings. """ rows = db.execute("SELECT book_id FROM batch_queue ORDER BY added_at").fetchall() return [str(r[0]) for r in rows] def clear_batch_queue(db: sqlite3.Connection) -> None: """Remove all entries from the batch queue. Args: db: Open database connection (must be writable). """ db.execute("DELETE FROM batch_queue")