Produkt->Kategorie-Cache: bekannte Produkte ohne LLM (SQLite, modellübergreifend)

Neuer produktcache.py (Stufe 2): speichert pro Produkt (Titel+Marke, mengen-
invariant) die einmal ermittelte Gruppe in SQLite, bulk-load ins dict -> O(1).
Schnitt gewahrt: kein LLM-Import, nur Gruppe (nie Angebotsdaten), Whitelist
beim Lesen+Schreiben, nur SICHERE Zuordnungen gecacht.

kategorisiere(cache=, statistik=): Lookup vor dem LLM, Dedup im Lauf (ein
Produkt = ein Posten), Write-Back danach. Parallel-/id-Logik unverändert.
als_struktur/web/cli verdrahtet (Statistik 'X aus Cache · Y neu', --no-cache).

Live verifiziert (1903 Angebote PLZ 60487): Lauf 1 (gemini) 1551 neu; Lauf 2
(deepseek, anderes Modell) nur 110 neu, 1765 aus Cache -> ~93% weniger LLM-Calls,
modellübergreifend. +12 Tests (Round-Trip, Whitelist, Hit-vermeidet-Call, Dedup,
nur-sichere, Schnitt). 70 Tests grün.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jeuner 2026-06-03 18:37:12 +02:00
parent 2029eb9fcf
commit 077a877480
8 changed files with 413 additions and 15 deletions

View file

@ -55,6 +55,11 @@ def main(argv: list[str] | None = None) -> int:
action="store_true",
help="vor dem Lauf das OpenRouter-Modell interaktiv wählen (Liste/Suche/Update)",
)
parser.add_argument(
"--no-cache",
action="store_true",
help="den Produkt->Kategorie-Cache nicht nutzen (alles neu kategorisieren)",
)
args = parser.parse_args(argv)
# Reiner Listen-/Suchmodus -- braucht keinen Ort und keinen Key.
@ -102,12 +107,28 @@ def main(argv: list[str] | None = None) -> int:
from .kategorisieren import baue_kategorisierer, kategorisiere
from .uebersicht import rendern
cache = None
if not args.no_cache:
from .produktcache import ProduktCache
cache = ProduktCache()
stat: dict = {}
try:
kat = kategorisiere(list(fetch.angebote), baue_kategorisierer(anbieter, modell))
kat = kategorisiere(
list(fetch.angebote),
baue_kategorisierer(anbieter, modell),
cache=cache,
statistik=stat,
)
except AbbruchFehler as e:
print(e.als_text(), file=sys.stderr)
return 2
if cache is not None:
print(
f"({stat.get('aus_cache', 0)} aus Cache · {stat.get('neu', 0)} neu kategorisiert)",
file=sys.stderr,
)
print(rendern(fetch, kat))
return 0

View file

@ -40,14 +40,19 @@ def kategorisiere(
batch_groesse: int = 25,
fortschritt=None,
parallel: int = 8,
cache=None,
statistik: dict | None = None,
) -> list[KategorisiertesAngebot]:
"""Ordnet jedes Angebot einer Produktgruppe zu, ohne Daten zu verändern.
`fortschritt`: optionaler Callback (erledigte_batches, gesamt_batches) --
für Live-Anzeigen (z. B. die Web-UI).
`fortschritt`: optionaler Callback (erledigte_batches, gesamt_batches).
`parallel`: bis zu so viele Batches gleichzeitig (LLM-Calls sind I/O-bound).
Ändert die Logik nicht -- nur die Ausführungs-Reihenfolge. Die Zuordnung
erfolgt id-basiert, ist also reihenfolge-unabhängig.
`cache`: optionaler ProduktCache. Bekannte Produkte werden per Lookup direkt
übernommen (kein LLM); nur unbekannte gehen ans Modell, dedupliziert
(ein Produkt = ein Posten). Neue SICHERE Zuordnungen werden zurück-
geschrieben. Default None -> heutiges Verhalten, unverändert.
`statistik`: optionales dict, wird mit {aus_cache, neu} befüllt.
Die Zuordnung ist id-basiert, also reihenfolge- (und parallel-)unabhängig.
"""
import threading
@ -55,9 +60,37 @@ def kategorisiere(
ergebnis: dict[str, KategorisiertesAngebot] = {}
lock = threading.Lock()
# --- Cache-Phase: bekannte Produkte übernehmen, Rest deduplizieren -------
aids_von_schluessel: dict[str, list[str]] = {}
schluessel_von_repr: dict[str, str] = {} # Repräsentant-id -> Produkt-Schlüssel
neu_fuer_cache: dict[str, str] = {}
aus_cache = 0
modell_name = getattr(kategorisierer, "_modell", None)
if cache is not None:
from .produktcache import produkt_schluessel
repraesentant: dict[str, Angebot] = {}
for a in angebote:
s = produkt_schluessel(a.titel, a.marke)
gruppe = cache.hole(s)
if gruppe is not None:
ergebnis[a.angebot_id] = KategorisiertesAngebot(
angebot=a, gruppe=gruppe, unsicher=False
)
aus_cache += 1
else:
aids_von_schluessel.setdefault(s, []).append(a.angebot_id)
if s not in repraesentant:
repraesentant[s] = a
schluessel_von_repr[a.angebot_id] = s
zu_kategorisieren = list(repraesentant.values())
else:
zu_kategorisieren = angebote
batches = [
angebote[start : start + batch_groesse]
for start in range(0, len(angebote), batch_groesse)
zu_kategorisieren[start : start + batch_groesse]
for start in range(0, len(zu_kategorisieren), batch_groesse)
]
gesamt_batches = max(1, len(batches))
erledigt = [0]
@ -69,18 +102,29 @@ def kategorisiere(
]
return kategorisierer.klassifiziere(posten)
def setze(aid, gruppe, unsicher):
if aid not in original or aid in ergebnis:
return
ergebnis[aid] = KategorisiertesAngebot(
angebot=original[aid], gruppe=gruppe, unsicher=unsicher
)
def uebernehmen(antworten):
with lock:
for ant in antworten:
aid = ant.get("id")
if aid not in original or aid in ergebnis:
continue # fremde/duplizierte ID ignorieren
gruppe, unsicher = _bereinige_gruppe(
ant.get("gruppe"), ant.get("unsicher")
)
ergebnis[aid] = KategorisiertesAngebot(
angebot=original[aid], gruppe=gruppe, unsicher=unsicher
)
schluessel = schluessel_von_repr.get(aid)
if schluessel is not None:
# Repräsentant -> Ergebnis auf ALLE Angebote desselben Produkts
for ziel in aids_von_schluessel[schluessel]:
setze(ziel, gruppe, unsicher)
if not unsicher:
neu_fuer_cache[schluessel] = gruppe
else:
setze(aid, gruppe, unsicher)
erledigt[0] += 1
if fortschritt is not None:
fortschritt(erledigt[0], gesamt_batches)
@ -96,6 +140,16 @@ def kategorisiere(
for batch in batches:
uebernehmen(verarbeite(batch))
# Write-Back: neue, SICHERE Zuordnungen in den Cache (ein executemany).
if cache is not None and neu_fuer_cache:
cache.schreibe_viele(
[(s, g, modell_name) for s, g in neu_fuer_cache.items()]
)
if statistik is not None:
statistik["aus_cache"] = aus_cache
statistik["neu"] = len(zu_kategorisieren)
# Posten, die das Modell nicht (gültig) beantwortet hat: ehrlich als
# unsicher mit Fallback markieren -- nicht still einsortieren.
out: list[KategorisiertesAngebot] = []

View file

@ -0,0 +1,129 @@
"""Produkt→Kategorie-Cache -- Teil des Kategorisier-Teils (Stufe 2).
Speichert pro PRODUKT-Identität (Titel + Marke, NICHT pro Angebot/Preis) die
einmal vom LLM ermittelte Produktgruppe. So muss ein bekanntes Produkt nicht
erneut eingeordnet werden -- über Zeit sinkt die LLM-Last drastisch (nur noch
*neue* Produkte gehen ans Modell), und ein günstiges Mini-Model genügt.
Schnitt-konform:
* Es werden ausschließlich Werte gespeichert, die vom LLM stammen (Gruppe) --
NIE Angebotsdaten (Preis/Händler/Gültigkeit). Der Cache erinnert nur an eine
bereits getroffene Einordnung; er repariert keine Daten.
* Er importiert KEIN LLM-Modul und keinen Kategorisier-Code (nur `config`).
* Geschlossene Liste: Gelesen UND geschrieben werden nur Gruppen aus
PRODUKTGRUPPEN. Eine off-list-Zeile (z. B. manipulierte DB oder geänderte
Liste) wird beim Lesen verworfen -- selbstheilend, kein TTL nötig.
* Nur SICHERE Zuordnungen werden abgelegt (siehe kategorisieren.py) -- so ist
jeder Cache-Treffer per Konstruktion sicher.
Der Cache ist global (ort-/wochenübergreifend): Die Kategorie eines Produkts ist
eine objektive Eigenschaft, unabhängig von PLZ, Woche oder Modell.
"""
from __future__ import annotations
import hashlib
import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from .config import PRODUKTGRUPPEN
# Globaler Ablageort, neben dem Roh-Cache. `/data/` ist bereits in .gitignore.
STANDARD_DB = Path("data/kategorie_cache.sqlite")
_WS = re.compile(r"\s+")
def _norm(text: str | None) -> str:
return _WS.sub(" ", (text or "").strip().lower())
def produkt_schluessel(titel: str, marke: str | None) -> str:
"""Stabile Produkt-Identität aus Titel + Marke (mengen-invariant).
Menge bleibt bewusst draußen: 'Butter 250 g' und 'Butter 500 g' sind dieselbe
Produktgruppe -- das maximiert die Trefferquote.
"""
roh = _norm(titel) + "|" + _norm(marke)
return hashlib.sha1(roh.encode("utf-8")).hexdigest()[:16]
@dataclass(frozen=True)
class CacheEintrag:
schluessel: str
gruppe: str
modell: str | None
gesehen_am: str # ISO
class ProduktCache:
"""Schneller, persistenter Produkt→Gruppe-Cache (SQLite + In-Memory-dict).
Beim ersten Zugriff wird die ganze (kleine) Tabelle einmalig in ein dict
geladen -> Lookups sind O(1) im RAM, kein SQL pro Posten. Geschrieben wird
gebündelt (`schreibe_viele`) in einer Transaktion.
"""
def __init__(self, *, db_pfad: Path | str | None = None) -> None:
self._pfad = Path(db_pfad) if db_pfad else STANDARD_DB
self._mem: dict[str, str] | None = None # lazy
self._init_db()
# -- intern -----------------------------------------------------------
def _verbinde(self):
import sqlite3
self._pfad.parent.mkdir(parents=True, exist_ok=True)
return sqlite3.connect(str(self._pfad))
def _init_db(self) -> None:
with self._verbinde() as con:
con.execute(
"CREATE TABLE IF NOT EXISTS produkt_kategorie ("
" schluessel TEXT PRIMARY KEY,"
" gruppe TEXT NOT NULL,"
" modell TEXT,"
" gesehen_am TEXT"
")"
)
def _lade(self) -> dict[str, str]:
if self._mem is None:
with self._verbinde() as con:
rows = con.execute(
"SELECT schluessel, gruppe FROM produkt_kategorie"
).fetchall()
# Whitelist auch beim Laden: nur gültige Gruppen in den Speicher.
self._mem = {k: g for k, g in rows if g in PRODUKTGRUPPEN}
return self._mem
# -- öffentliche API --------------------------------------------------
def hole(self, schluessel: str) -> str | None:
"""Gruppe für ein Produkt -- nur wenn sie in der geschlossenen Liste ist."""
gruppe = self._lade().get(schluessel)
return gruppe if gruppe in PRODUKTGRUPPEN else None
def schreibe_viele(self, eintraege: list[tuple[str, str, str | None]]) -> int:
"""Speichert (schluessel, gruppe, modell)-Tupel. Nur Gruppen aus der
geschlossenen Liste werden übernommen. Gibt die Zahl der Schreibungen."""
gueltig = [(s, g, m) for (s, g, m) in eintraege if g in PRODUKTGRUPPEN and s]
if not gueltig:
return 0
jetzt = datetime.now().isoformat()
with self._verbinde() as con:
con.executemany(
"INSERT OR REPLACE INTO produkt_kategorie "
"(schluessel, gruppe, modell, gesehen_am) VALUES (?, ?, ?, ?)",
[(s, g, m, jetzt) for (s, g, m) in gueltig],
)
mem = self._lade()
for s, g, _ in gueltig:
mem[s] = g
return len(gueltig)
def groesse(self) -> int:
return len(self._lade())

View file

@ -47,6 +47,8 @@ def als_struktur(
*,
modell: str | None = None,
anbieter: str | None = None,
aus_cache: int | None = None,
neu: int | None = None,
) -> dict:
"""Strukturierte Ausgabe für die Web-UI -- dieselben belegten Felder wie der
Markdown-Renderer, nur als JSON-fähiges dict. Leere Gruppen bleiben enthalten
@ -62,6 +64,8 @@ def als_struktur(
"unsicher": sum(1 for k in kategorisiert if k.unsicher),
"modell": modell,
"anbieter": anbieter,
"aus_cache": aus_cache,
"neu": neu,
"quellen": list(fetch.abgedeckte_quellen),
"haendler": list(fetch.gesehene_haendler),
"hinweise": list(fetch.hinweise),

View file

@ -234,6 +234,7 @@ def _run_kategorisieren(job_id, plz, fetch, modell, anbieter, key) -> None:
job = _jobs[job_id]
try:
from .kategorisieren import baue_kategorisierer, kategorisiere
from .produktcache import ProduktCache
from .uebersicht import als_struktur
kt = baue_kategorisierer(anbieter, modell, api_key=key)
@ -252,15 +253,21 @@ def _run_kategorisieren(job_id, plz, fetch, modell, anbieter, key) -> None:
if done == total or done % 5 == 0: # nicht jede Batch -> Log lesbar
print(f"[Stufe 2] PLZ {plz} · Batch {done}/{total}", flush=True)
kat = kategorisiere(list(fetch.angebote), kt, fortschritt=fort)
cache = ProduktCache() # Produkt->Kategorie-Cache: bekannte Produkte ohne LLM
stat: dict = {}
kat = kategorisiere(
list(fetch.angebote), kt, fortschritt=fort, cache=cache, statistik=stat
)
job["ergebnis"] = als_struktur(
fetch, kat, modell=modell_genutzt, anbieter=anbieter
fetch, kat, modell=modell_genutzt, anbieter=anbieter,
aus_cache=stat.get("aus_cache"), neu=stat.get("neu"),
)
job["status"] = "fertig"
_ergebnis_cache[(plz, anbieter, modell)] = job["ergebnis"]
print(
f"[Stufe 2] fertig · PLZ {plz} · {job['ergebnis']['unsicher']} unsicher",
f"[Stufe 2] fertig · PLZ {plz} · {stat.get('aus_cache', 0)} aus Cache · "
f"{stat.get('neu', 0)} neu · {job['ergebnis']['unsicher']} unsicher",
flush=True,
)
except AbbruchFehler as e:

View file

@ -37,6 +37,24 @@ class FakeKategorisierer:
return self._fn(posten)
class CountingFakeKategorisierer:
"""Zählt die ans LLM gegebenen Posten -- für Cache-Tests (Hit/Dedup)."""
def __init__(self, gruppe: str = "Sonstiges", unsicher: bool = False):
self.gesehen = 0
self.titel: list[str] = []
self._gruppe = gruppe
self._unsicher = unsicher
def klassifiziere(self, posten: list[dict]) -> list[dict]:
self.gesehen += len(posten)
self.titel.extend(p["titel"] for p in posten)
return [
{"id": p["id"], "gruppe": self._gruppe, "unsicher": self._unsicher}
for p in posten
]
def beispiel_angebot(titel="Butter", **kw) -> Angebot:
"""Belegtes Angebot mit Default-Pflichtfeldern; einzeln überschreibbar."""
daten = dict(

View file

@ -0,0 +1,64 @@
"""Cache-Integration in kategorisiere() -- offline, eigene DB pro Test."""
from angebote.kategorisieren import kategorisiere
from angebote.produktcache import ProduktCache
from tests.fakes import CountingFakeKategorisierer, beispiel_angebot
def _cache(tmp_path):
return ProduktCache(db_pfad=tmp_path / "c.sqlite")
def test_zweiter_lauf_komplett_aus_cache(tmp_path):
cache = _cache(tmp_path)
angebote = [
beispiel_angebot("Butter", marke="Meggle"),
beispiel_angebot("Apfel", marke=None),
]
# 1. Lauf: alles neu ans LLM, wird gecacht
fake1 = CountingFakeKategorisierer("Sonstiges", unsicher=False)
stat1 = {}
kategorisiere(angebote, fake1, cache=cache, statistik=stat1)
assert fake1.gesehen == 2
assert stat1 == {"aus_cache": 0, "neu": 2}
# 2. Lauf: nichts mehr ans LLM (alles aus Cache)
fake2 = CountingFakeKategorisierer("Sonstiges")
stat2 = {}
erg = kategorisiere(angebote, fake2, cache=cache, statistik=stat2)
assert fake2.gesehen == 0
assert stat2 == {"aus_cache": 2, "neu": 0}
assert all(k.gruppe == "Sonstiges" and not k.unsicher for k in erg)
def test_dedup_ein_produkt_nur_ein_posten(tmp_path):
cache = _cache(tmp_path)
# zwei Angebote DESSELBEN Produkts (Titel+Marke), aber versch. Preis/Händler
a1 = beispiel_angebot("Butter", marke="Meggle", preis=1.49, haendler="REWE")
a2 = beispiel_angebot("Butter", marke="Meggle", preis=1.99, haendler="EDEKA")
assert a1.angebot_id != a2.angebot_id
fake = CountingFakeKategorisierer("Molkereiprodukte & Eier")
erg = kategorisiere([a1, a2], fake, cache=cache)
assert fake.gesehen == 1 # nur EIN Posten ans LLM
assert len(erg) == 2
assert all(k.gruppe == "Molkereiprodukte & Eier" for k in erg)
def test_unsichere_werden_nicht_gecacht(tmp_path):
cache = _cache(tmp_path)
a = beispiel_angebot("Hafer-Pflanzendrink", marke=None)
fake = CountingFakeKategorisierer("Molkereiprodukte & Eier", unsicher=True)
kategorisiere([a], fake, cache=cache)
assert cache.groesse() == 0 # unsicher -> nicht gespeichert
# Folge-Lauf fragt erneut (keine Propagation des Zweifels)
fake2 = CountingFakeKategorisierer("Sonstiges")
kategorisiere([a], fake2, cache=cache)
assert fake2.gesehen == 1
def test_ohne_cache_geht_alles_ans_llm(tmp_path):
angebote = [beispiel_angebot("Butter"), beispiel_angebot("Apfel", marke=None)]
fake = CountingFakeKategorisierer("Sonstiges")
erg = kategorisiere(angebote, fake) # cache=None
assert fake.gesehen == 2
assert len(erg) == 2

101
tests/test_produktcache.py Normal file
View file

@ -0,0 +1,101 @@
"""Tests für den Produkt→Kategorie-Cache -- offline, eigene DB pro Test."""
import subprocess
import sys
from pathlib import Path
from angebote.produktcache import ProduktCache, produkt_schluessel
SRC = Path(__file__).resolve().parents[1] / "src"
def _cache(tmp_path) -> ProduktCache:
return ProduktCache(db_pfad=tmp_path / "cache.sqlite")
# -- Schlüssel ----------------------------------------------------------------
def test_schluessel_ist_mengen_invariant():
# Titel + Marke bestimmen den Schlüssel; Menge spielt keine Rolle.
assert produkt_schluessel("Butter", "Meggle") == produkt_schluessel(
" butter ", "MEGGLE"
)
def test_schluessel_unterscheidet_marke():
assert produkt_schluessel("Butter", "Meggle") != produkt_schluessel(
"Butter", "Kerrygold"
)
# -- Round-Trip / Persistenz --------------------------------------------------
def test_round_trip_ueber_instanzgrenzen(tmp_path):
db = tmp_path / "c.sqlite"
c1 = ProduktCache(db_pfad=db)
s = produkt_schluessel("Toffifee", "Storck")
c1.schreibe_viele([(s, "Süßwaren & Snacks", "deepseek")])
# frische Instanz auf derselben DB
c2 = ProduktCache(db_pfad=db)
assert c2.hole(s) == "Süßwaren & Snacks"
assert c2.groesse() == 1
def test_unbekannter_schluessel_gibt_none(tmp_path):
c = _cache(tmp_path)
assert c.hole("gibtsnicht") is None
# -- Geschlossene Liste (Whitelist) ------------------------------------------
def test_off_list_gruppe_wird_nicht_geschrieben(tmp_path):
c = _cache(tmp_path)
n = c.schreibe_viele([("k1", "Weltraumzeug", None)])
assert n == 0
assert c.hole("k1") is None
def test_off_list_zeile_in_db_wird_beim_lesen_verworfen(tmp_path):
import sqlite3
db = tmp_path / "c.sqlite"
ProduktCache(db_pfad=db) # legt Tabelle an
# manipulierte Zeile direkt in die DB schreiben
with sqlite3.connect(str(db)) as con:
con.execute(
"INSERT INTO produkt_kategorie VALUES (?,?,?,?)",
("k1", "Quatschgruppe", None, "2026-01-01"),
)
c = ProduktCache(db_pfad=db)
assert c.hole("k1") is None # Whitelist filtert sie heraus
assert c.groesse() == 0
# -- Schnitt: kein LLM im Cache ----------------------------------------------
def test_cache_laedt_kein_anthropic():
code = (
"import sys; import angebote.produktcache; "
"assert 'anthropic' not in sys.modules, 'Cache hat anthropic geladen'; "
"print('ok')"
)
proc = subprocess.run(
[sys.executable, "-c", code], cwd=str(SRC), capture_output=True, text=True
)
assert proc.returncode == 0, proc.stderr
assert "ok" in proc.stdout
def test_cache_importiert_kein_llm():
quelltext = (SRC / "angebote" / "produktcache.py").read_text("utf-8")
import_zeilen = "\n".join(
z for z in quelltext.splitlines()
if z.strip().startswith(("import ", "from "))
).lower()
assert "anthropic" not in import_zeilen
assert "openai" not in import_zeilen
assert "kategorisieren" not in import_zeilen