diff --git a/src/angebote/cli.py b/src/angebote/cli.py index d8725bb..fb58f3a 100644 --- a/src/angebote/cli.py +++ b/src/angebote/cli.py @@ -55,6 +55,11 @@ def main(argv: list[str] | None = None) -> int: action="store_true", help="vor dem Lauf das OpenRouter-Modell interaktiv wählen (Liste/Suche/Update)", ) + parser.add_argument( + "--no-cache", + action="store_true", + help="den Produkt->Kategorie-Cache nicht nutzen (alles neu kategorisieren)", + ) args = parser.parse_args(argv) # Reiner Listen-/Suchmodus -- braucht keinen Ort und keinen Key. @@ -102,12 +107,28 @@ def main(argv: list[str] | None = None) -> int: from .kategorisieren import baue_kategorisierer, kategorisiere from .uebersicht import rendern + cache = None + if not args.no_cache: + from .produktcache import ProduktCache + + cache = ProduktCache() + stat: dict = {} try: - kat = kategorisiere(list(fetch.angebote), baue_kategorisierer(anbieter, modell)) + kat = kategorisiere( + list(fetch.angebote), + baue_kategorisierer(anbieter, modell), + cache=cache, + statistik=stat, + ) except AbbruchFehler as e: print(e.als_text(), file=sys.stderr) return 2 + if cache is not None: + print( + f"({stat.get('aus_cache', 0)} aus Cache · {stat.get('neu', 0)} neu kategorisiert)", + file=sys.stderr, + ) print(rendern(fetch, kat)) return 0 diff --git a/src/angebote/kategorisieren.py b/src/angebote/kategorisieren.py index d6a240d..9d42865 100644 --- a/src/angebote/kategorisieren.py +++ b/src/angebote/kategorisieren.py @@ -40,14 +40,19 @@ def kategorisiere( batch_groesse: int = 25, fortschritt=None, parallel: int = 8, + cache=None, + statistik: dict | None = None, ) -> list[KategorisiertesAngebot]: """Ordnet jedes Angebot einer Produktgruppe zu, ohne Daten zu verändern. - `fortschritt`: optionaler Callback (erledigte_batches, gesamt_batches) -- - für Live-Anzeigen (z. B. die Web-UI). + `fortschritt`: optionaler Callback (erledigte_batches, gesamt_batches). `parallel`: bis zu so viele Batches gleichzeitig (LLM-Calls sind I/O-bound). - Ändert die Logik nicht -- nur die Ausführungs-Reihenfolge. Die Zuordnung - erfolgt id-basiert, ist also reihenfolge-unabhängig. + `cache`: optionaler ProduktCache. Bekannte Produkte werden per Lookup direkt + übernommen (kein LLM); nur unbekannte gehen ans Modell, dedupliziert + (ein Produkt = ein Posten). Neue SICHERE Zuordnungen werden zurück- + geschrieben. Default None -> heutiges Verhalten, unverändert. + `statistik`: optionales dict, wird mit {aus_cache, neu} befüllt. + Die Zuordnung ist id-basiert, also reihenfolge- (und parallel-)unabhängig. """ import threading @@ -55,9 +60,37 @@ def kategorisiere( ergebnis: dict[str, KategorisiertesAngebot] = {} lock = threading.Lock() + # --- Cache-Phase: bekannte Produkte übernehmen, Rest deduplizieren ------- + aids_von_schluessel: dict[str, list[str]] = {} + schluessel_von_repr: dict[str, str] = {} # Repräsentant-id -> Produkt-Schlüssel + neu_fuer_cache: dict[str, str] = {} + aus_cache = 0 + modell_name = getattr(kategorisierer, "_modell", None) + + if cache is not None: + from .produktcache import produkt_schluessel + + repraesentant: dict[str, Angebot] = {} + for a in angebote: + s = produkt_schluessel(a.titel, a.marke) + gruppe = cache.hole(s) + if gruppe is not None: + ergebnis[a.angebot_id] = KategorisiertesAngebot( + angebot=a, gruppe=gruppe, unsicher=False + ) + aus_cache += 1 + else: + aids_von_schluessel.setdefault(s, []).append(a.angebot_id) + if s not in repraesentant: + repraesentant[s] = a + schluessel_von_repr[a.angebot_id] = s + zu_kategorisieren = list(repraesentant.values()) + else: + zu_kategorisieren = angebote + batches = [ - angebote[start : start + batch_groesse] - for start in range(0, len(angebote), batch_groesse) + zu_kategorisieren[start : start + batch_groesse] + for start in range(0, len(zu_kategorisieren), batch_groesse) ] gesamt_batches = max(1, len(batches)) erledigt = [0] @@ -69,18 +102,29 @@ def kategorisiere( ] return kategorisierer.klassifiziere(posten) + def setze(aid, gruppe, unsicher): + if aid not in original or aid in ergebnis: + return + ergebnis[aid] = KategorisiertesAngebot( + angebot=original[aid], gruppe=gruppe, unsicher=unsicher + ) + def uebernehmen(antworten): with lock: for ant in antworten: aid = ant.get("id") - if aid not in original or aid in ergebnis: - continue # fremde/duplizierte ID ignorieren gruppe, unsicher = _bereinige_gruppe( ant.get("gruppe"), ant.get("unsicher") ) - ergebnis[aid] = KategorisiertesAngebot( - angebot=original[aid], gruppe=gruppe, unsicher=unsicher - ) + schluessel = schluessel_von_repr.get(aid) + if schluessel is not None: + # Repräsentant -> Ergebnis auf ALLE Angebote desselben Produkts + for ziel in aids_von_schluessel[schluessel]: + setze(ziel, gruppe, unsicher) + if not unsicher: + neu_fuer_cache[schluessel] = gruppe + else: + setze(aid, gruppe, unsicher) erledigt[0] += 1 if fortschritt is not None: fortschritt(erledigt[0], gesamt_batches) @@ -96,6 +140,16 @@ def kategorisiere( for batch in batches: uebernehmen(verarbeite(batch)) + # Write-Back: neue, SICHERE Zuordnungen in den Cache (ein executemany). + if cache is not None and neu_fuer_cache: + cache.schreibe_viele( + [(s, g, modell_name) for s, g in neu_fuer_cache.items()] + ) + + if statistik is not None: + statistik["aus_cache"] = aus_cache + statistik["neu"] = len(zu_kategorisieren) + # Posten, die das Modell nicht (gültig) beantwortet hat: ehrlich als # unsicher mit Fallback markieren -- nicht still einsortieren. out: list[KategorisiertesAngebot] = [] diff --git a/src/angebote/produktcache.py b/src/angebote/produktcache.py new file mode 100644 index 0000000..fc2d682 --- /dev/null +++ b/src/angebote/produktcache.py @@ -0,0 +1,129 @@ +"""Produkt→Kategorie-Cache -- Teil des Kategorisier-Teils (Stufe 2). + +Speichert pro PRODUKT-Identität (Titel + Marke, NICHT pro Angebot/Preis) die +einmal vom LLM ermittelte Produktgruppe. So muss ein bekanntes Produkt nicht +erneut eingeordnet werden -- über Zeit sinkt die LLM-Last drastisch (nur noch +*neue* Produkte gehen ans Modell), und ein günstiges Mini-Model genügt. + +Schnitt-konform: + * Es werden ausschließlich Werte gespeichert, die vom LLM stammen (Gruppe) -- + NIE Angebotsdaten (Preis/Händler/Gültigkeit). Der Cache erinnert nur an eine + bereits getroffene Einordnung; er repariert keine Daten. + * Er importiert KEIN LLM-Modul und keinen Kategorisier-Code (nur `config`). + * Geschlossene Liste: Gelesen UND geschrieben werden nur Gruppen aus + PRODUKTGRUPPEN. Eine off-list-Zeile (z. B. manipulierte DB oder geänderte + Liste) wird beim Lesen verworfen -- selbstheilend, kein TTL nötig. + * Nur SICHERE Zuordnungen werden abgelegt (siehe kategorisieren.py) -- so ist + jeder Cache-Treffer per Konstruktion sicher. + +Der Cache ist global (ort-/wochenübergreifend): Die Kategorie eines Produkts ist +eine objektive Eigenschaft, unabhängig von PLZ, Woche oder Modell. +""" + +from __future__ import annotations + +import hashlib +import re +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +from .config import PRODUKTGRUPPEN + +# Globaler Ablageort, neben dem Roh-Cache. `/data/` ist bereits in .gitignore. +STANDARD_DB = Path("data/kategorie_cache.sqlite") + +_WS = re.compile(r"\s+") + + +def _norm(text: str | None) -> str: + return _WS.sub(" ", (text or "").strip().lower()) + + +def produkt_schluessel(titel: str, marke: str | None) -> str: + """Stabile Produkt-Identität aus Titel + Marke (mengen-invariant). + + Menge bleibt bewusst draußen: 'Butter 250 g' und 'Butter 500 g' sind dieselbe + Produktgruppe -- das maximiert die Trefferquote. + """ + roh = _norm(titel) + "|" + _norm(marke) + return hashlib.sha1(roh.encode("utf-8")).hexdigest()[:16] + + +@dataclass(frozen=True) +class CacheEintrag: + schluessel: str + gruppe: str + modell: str | None + gesehen_am: str # ISO + + +class ProduktCache: + """Schneller, persistenter Produkt→Gruppe-Cache (SQLite + In-Memory-dict). + + Beim ersten Zugriff wird die ganze (kleine) Tabelle einmalig in ein dict + geladen -> Lookups sind O(1) im RAM, kein SQL pro Posten. Geschrieben wird + gebündelt (`schreibe_viele`) in einer Transaktion. + """ + + def __init__(self, *, db_pfad: Path | str | None = None) -> None: + self._pfad = Path(db_pfad) if db_pfad else STANDARD_DB + self._mem: dict[str, str] | None = None # lazy + self._init_db() + + # -- intern ----------------------------------------------------------- + + def _verbinde(self): + import sqlite3 + + self._pfad.parent.mkdir(parents=True, exist_ok=True) + return sqlite3.connect(str(self._pfad)) + + def _init_db(self) -> None: + with self._verbinde() as con: + con.execute( + "CREATE TABLE IF NOT EXISTS produkt_kategorie (" + " schluessel TEXT PRIMARY KEY," + " gruppe TEXT NOT NULL," + " modell TEXT," + " gesehen_am TEXT" + ")" + ) + + def _lade(self) -> dict[str, str]: + if self._mem is None: + with self._verbinde() as con: + rows = con.execute( + "SELECT schluessel, gruppe FROM produkt_kategorie" + ).fetchall() + # Whitelist auch beim Laden: nur gültige Gruppen in den Speicher. + self._mem = {k: g for k, g in rows if g in PRODUKTGRUPPEN} + return self._mem + + # -- öffentliche API -------------------------------------------------- + + def hole(self, schluessel: str) -> str | None: + """Gruppe für ein Produkt -- nur wenn sie in der geschlossenen Liste ist.""" + gruppe = self._lade().get(schluessel) + return gruppe if gruppe in PRODUKTGRUPPEN else None + + def schreibe_viele(self, eintraege: list[tuple[str, str, str | None]]) -> int: + """Speichert (schluessel, gruppe, modell)-Tupel. Nur Gruppen aus der + geschlossenen Liste werden übernommen. Gibt die Zahl der Schreibungen.""" + gueltig = [(s, g, m) for (s, g, m) in eintraege if g in PRODUKTGRUPPEN and s] + if not gueltig: + return 0 + jetzt = datetime.now().isoformat() + with self._verbinde() as con: + con.executemany( + "INSERT OR REPLACE INTO produkt_kategorie " + "(schluessel, gruppe, modell, gesehen_am) VALUES (?, ?, ?, ?)", + [(s, g, m, jetzt) for (s, g, m) in gueltig], + ) + mem = self._lade() + for s, g, _ in gueltig: + mem[s] = g + return len(gueltig) + + def groesse(self) -> int: + return len(self._lade()) diff --git a/src/angebote/uebersicht.py b/src/angebote/uebersicht.py index d62f392..0343678 100644 --- a/src/angebote/uebersicht.py +++ b/src/angebote/uebersicht.py @@ -47,6 +47,8 @@ def als_struktur( *, modell: str | None = None, anbieter: str | None = None, + aus_cache: int | None = None, + neu: int | None = None, ) -> dict: """Strukturierte Ausgabe für die Web-UI -- dieselben belegten Felder wie der Markdown-Renderer, nur als JSON-fähiges dict. Leere Gruppen bleiben enthalten @@ -62,6 +64,8 @@ def als_struktur( "unsicher": sum(1 for k in kategorisiert if k.unsicher), "modell": modell, "anbieter": anbieter, + "aus_cache": aus_cache, + "neu": neu, "quellen": list(fetch.abgedeckte_quellen), "haendler": list(fetch.gesehene_haendler), "hinweise": list(fetch.hinweise), diff --git a/src/angebote/web.py b/src/angebote/web.py index 436bd8a..50138e0 100644 --- a/src/angebote/web.py +++ b/src/angebote/web.py @@ -234,6 +234,7 @@ def _run_kategorisieren(job_id, plz, fetch, modell, anbieter, key) -> None: job = _jobs[job_id] try: from .kategorisieren import baue_kategorisierer, kategorisiere + from .produktcache import ProduktCache from .uebersicht import als_struktur kt = baue_kategorisierer(anbieter, modell, api_key=key) @@ -252,15 +253,21 @@ def _run_kategorisieren(job_id, plz, fetch, modell, anbieter, key) -> None: if done == total or done % 5 == 0: # nicht jede Batch -> Log lesbar print(f"[Stufe 2] PLZ {plz} · Batch {done}/{total}", flush=True) - kat = kategorisiere(list(fetch.angebote), kt, fortschritt=fort) + cache = ProduktCache() # Produkt->Kategorie-Cache: bekannte Produkte ohne LLM + stat: dict = {} + kat = kategorisiere( + list(fetch.angebote), kt, fortschritt=fort, cache=cache, statistik=stat + ) job["ergebnis"] = als_struktur( - fetch, kat, modell=modell_genutzt, anbieter=anbieter + fetch, kat, modell=modell_genutzt, anbieter=anbieter, + aus_cache=stat.get("aus_cache"), neu=stat.get("neu"), ) job["status"] = "fertig" _ergebnis_cache[(plz, anbieter, modell)] = job["ergebnis"] print( - f"[Stufe 2] fertig · PLZ {plz} · {job['ergebnis']['unsicher']} unsicher", + f"[Stufe 2] fertig · PLZ {plz} · {stat.get('aus_cache', 0)} aus Cache · " + f"{stat.get('neu', 0)} neu · {job['ergebnis']['unsicher']} unsicher", flush=True, ) except AbbruchFehler as e: diff --git a/tests/fakes.py b/tests/fakes.py index fd0c3aa..e2b6c50 100644 --- a/tests/fakes.py +++ b/tests/fakes.py @@ -37,6 +37,24 @@ class FakeKategorisierer: return self._fn(posten) +class CountingFakeKategorisierer: + """Zählt die ans LLM gegebenen Posten -- für Cache-Tests (Hit/Dedup).""" + + def __init__(self, gruppe: str = "Sonstiges", unsicher: bool = False): + self.gesehen = 0 + self.titel: list[str] = [] + self._gruppe = gruppe + self._unsicher = unsicher + + def klassifiziere(self, posten: list[dict]) -> list[dict]: + self.gesehen += len(posten) + self.titel.extend(p["titel"] for p in posten) + return [ + {"id": p["id"], "gruppe": self._gruppe, "unsicher": self._unsicher} + for p in posten + ] + + def beispiel_angebot(titel="Butter", **kw) -> Angebot: """Belegtes Angebot mit Default-Pflichtfeldern; einzeln überschreibbar.""" daten = dict( diff --git a/tests/test_kategorisieren_cache.py b/tests/test_kategorisieren_cache.py new file mode 100644 index 0000000..e8a2d38 --- /dev/null +++ b/tests/test_kategorisieren_cache.py @@ -0,0 +1,64 @@ +"""Cache-Integration in kategorisiere() -- offline, eigene DB pro Test.""" + +from angebote.kategorisieren import kategorisiere +from angebote.produktcache import ProduktCache +from tests.fakes import CountingFakeKategorisierer, beispiel_angebot + + +def _cache(tmp_path): + return ProduktCache(db_pfad=tmp_path / "c.sqlite") + + +def test_zweiter_lauf_komplett_aus_cache(tmp_path): + cache = _cache(tmp_path) + angebote = [ + beispiel_angebot("Butter", marke="Meggle"), + beispiel_angebot("Apfel", marke=None), + ] + # 1. Lauf: alles neu ans LLM, wird gecacht + fake1 = CountingFakeKategorisierer("Sonstiges", unsicher=False) + stat1 = {} + kategorisiere(angebote, fake1, cache=cache, statistik=stat1) + assert fake1.gesehen == 2 + assert stat1 == {"aus_cache": 0, "neu": 2} + + # 2. Lauf: nichts mehr ans LLM (alles aus Cache) + fake2 = CountingFakeKategorisierer("Sonstiges") + stat2 = {} + erg = kategorisiere(angebote, fake2, cache=cache, statistik=stat2) + assert fake2.gesehen == 0 + assert stat2 == {"aus_cache": 2, "neu": 0} + assert all(k.gruppe == "Sonstiges" and not k.unsicher for k in erg) + + +def test_dedup_ein_produkt_nur_ein_posten(tmp_path): + cache = _cache(tmp_path) + # zwei Angebote DESSELBEN Produkts (Titel+Marke), aber versch. Preis/Händler + a1 = beispiel_angebot("Butter", marke="Meggle", preis=1.49, haendler="REWE") + a2 = beispiel_angebot("Butter", marke="Meggle", preis=1.99, haendler="EDEKA") + assert a1.angebot_id != a2.angebot_id + fake = CountingFakeKategorisierer("Molkereiprodukte & Eier") + erg = kategorisiere([a1, a2], fake, cache=cache) + assert fake.gesehen == 1 # nur EIN Posten ans LLM + assert len(erg) == 2 + assert all(k.gruppe == "Molkereiprodukte & Eier" for k in erg) + + +def test_unsichere_werden_nicht_gecacht(tmp_path): + cache = _cache(tmp_path) + a = beispiel_angebot("Hafer-Pflanzendrink", marke=None) + fake = CountingFakeKategorisierer("Molkereiprodukte & Eier", unsicher=True) + kategorisiere([a], fake, cache=cache) + assert cache.groesse() == 0 # unsicher -> nicht gespeichert + # Folge-Lauf fragt erneut (keine Propagation des Zweifels) + fake2 = CountingFakeKategorisierer("Sonstiges") + kategorisiere([a], fake2, cache=cache) + assert fake2.gesehen == 1 + + +def test_ohne_cache_geht_alles_ans_llm(tmp_path): + angebote = [beispiel_angebot("Butter"), beispiel_angebot("Apfel", marke=None)] + fake = CountingFakeKategorisierer("Sonstiges") + erg = kategorisiere(angebote, fake) # cache=None + assert fake.gesehen == 2 + assert len(erg) == 2 diff --git a/tests/test_produktcache.py b/tests/test_produktcache.py new file mode 100644 index 0000000..210b493 --- /dev/null +++ b/tests/test_produktcache.py @@ -0,0 +1,101 @@ +"""Tests für den Produkt→Kategorie-Cache -- offline, eigene DB pro Test.""" + +import subprocess +import sys +from pathlib import Path + +from angebote.produktcache import ProduktCache, produkt_schluessel + +SRC = Path(__file__).resolve().parents[1] / "src" + + +def _cache(tmp_path) -> ProduktCache: + return ProduktCache(db_pfad=tmp_path / "cache.sqlite") + + +# -- Schlüssel ---------------------------------------------------------------- + + +def test_schluessel_ist_mengen_invariant(): + # Titel + Marke bestimmen den Schlüssel; Menge spielt keine Rolle. + assert produkt_schluessel("Butter", "Meggle") == produkt_schluessel( + " butter ", "MEGGLE" + ) + + +def test_schluessel_unterscheidet_marke(): + assert produkt_schluessel("Butter", "Meggle") != produkt_schluessel( + "Butter", "Kerrygold" + ) + + +# -- Round-Trip / Persistenz -------------------------------------------------- + + +def test_round_trip_ueber_instanzgrenzen(tmp_path): + db = tmp_path / "c.sqlite" + c1 = ProduktCache(db_pfad=db) + s = produkt_schluessel("Toffifee", "Storck") + c1.schreibe_viele([(s, "Süßwaren & Snacks", "deepseek")]) + # frische Instanz auf derselben DB + c2 = ProduktCache(db_pfad=db) + assert c2.hole(s) == "Süßwaren & Snacks" + assert c2.groesse() == 1 + + +def test_unbekannter_schluessel_gibt_none(tmp_path): + c = _cache(tmp_path) + assert c.hole("gibtsnicht") is None + + +# -- Geschlossene Liste (Whitelist) ------------------------------------------ + + +def test_off_list_gruppe_wird_nicht_geschrieben(tmp_path): + c = _cache(tmp_path) + n = c.schreibe_viele([("k1", "Weltraumzeug", None)]) + assert n == 0 + assert c.hole("k1") is None + + +def test_off_list_zeile_in_db_wird_beim_lesen_verworfen(tmp_path): + import sqlite3 + + db = tmp_path / "c.sqlite" + ProduktCache(db_pfad=db) # legt Tabelle an + # manipulierte Zeile direkt in die DB schreiben + with sqlite3.connect(str(db)) as con: + con.execute( + "INSERT INTO produkt_kategorie VALUES (?,?,?,?)", + ("k1", "Quatschgruppe", None, "2026-01-01"), + ) + c = ProduktCache(db_pfad=db) + assert c.hole("k1") is None # Whitelist filtert sie heraus + assert c.groesse() == 0 + + +# -- Schnitt: kein LLM im Cache ---------------------------------------------- + + +def test_cache_laedt_kein_anthropic(): + code = ( + "import sys; import angebote.produktcache; " + "assert 'anthropic' not in sys.modules, 'Cache hat anthropic geladen'; " + "print('ok')" + ) + proc = subprocess.run( + [sys.executable, "-c", code], cwd=str(SRC), capture_output=True, text=True + ) + assert proc.returncode == 0, proc.stderr + assert "ok" in proc.stdout + + +def test_cache_importiert_kein_llm(): + quelltext = (SRC / "angebote" / "produktcache.py").read_text("utf-8") + import_zeilen = "\n".join( + z for z in quelltext.splitlines() + if z.strip().startswith(("import ", "from ")) + ).lower() + assert "anthropic" not in import_zeilen + assert "openai" not in import_zeilen + assert "kategorisieren" not in import_zeilen