diff --git a/.gitignore b/.gitignore index cc4d8d8..88771ce 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,7 @@ __pycache__/ # Original-Input-Archive (Specs liegen kanonisch in .claude/skills/) /files.zip /temp/ + +# gespeicherte Rohdaten (Stufe 1, pro PLZ/Woche generiert) +/src/data/ +/data/ diff --git a/src/angebote/speicher.py b/src/angebote/speicher.py new file mode 100644 index 0000000..1cbe2d1 --- /dev/null +++ b/src/angebote/speicher.py @@ -0,0 +1,181 @@ +"""Persistenz der belegten Rohdaten (Stufe 1) -- deterministisch, KEIN LLM. + +Trennt die zwei Stufen des Workflows auch auf der Platte: + + * Stufe 1 (Fetch) schreibt die belegten, normalisierten Angebote pro PLZ und + Kalenderwoche hierher. Nichts wird interpretiert oder kategorisiert. + * Stufe 2 (Kategorisieren) liest sie von hier -- sie fetcht NICHT erneut. + +Bewusst rohes JSON der belegten Felder: Was die Quelle nicht hergibt, bleibt +`null` (kein Auffüllen). Die Datei belegt zusätzlich Herkunft (Quellen, geholt +am, gesehene Händler), damit der gespeicherte Stand selbst auditierbar ist. + +Der Round-Trip ist verlustfrei für die belegten Felder: `lade_rohdaten` +rekonstruiert echte `Angebot`-Objekte (frozen), sodass Stufe 2 exakt mit dem +arbeitet, was Stufe 1 belegt hat. +""" + +from __future__ import annotations + +from datetime import date, datetime +from pathlib import Path + +from .modell import Angebot, FetchErgebnis + +# Standard-Ablageort. Relativ zum Arbeitsverzeichnis des Servers (src/), damit +# der Pfad neben dem bestehenden Fetch-Cache (.cache/) liegt und nicht ins +# Paket eingreift. Über `basis_dir` injizierbar (Tests). +STANDARD_BASIS = Path("data/roh") + + +def _iso(d) -> str | None: + return d.isoformat() if d is not None else None + + +def _dat(s) -> date | None: + return date.fromisoformat(s) if s else None + + +def _angebot_dict(a: Angebot) -> dict: + """Serialisiert ein belegtes Angebot vollständig -- inkl. stabiler ID.""" + return { + "titel": a.titel, + "haendler": a.haendler, + "quelle": a.quelle, + "abgerufen_am": a.abgerufen_am.isoformat(), + "marke": a.marke, + "preis": a.preis, + "grundpreis": a.grundpreis, + "menge": a.menge, + "gueltig_von": _iso(a.gueltig_von), + "gueltig_bis": _iso(a.gueltig_bis), + "angebot_id": a.angebot_id, + } + + +def _angebot_aus_dict(d: dict) -> Angebot: + """Rekonstruiert ein belegtes Angebot. Fehlende Felder bleiben fehlend.""" + return Angebot( + titel=d["titel"], + haendler=d["haendler"], + quelle=d["quelle"], + abgerufen_am=datetime.fromisoformat(d["abgerufen_am"]), + marke=d.get("marke"), + preis=d.get("preis"), + grundpreis=d.get("grundpreis"), + menge=d.get("menge"), + gueltig_von=_dat(d.get("gueltig_von")), + gueltig_bis=_dat(d.get("gueltig_bis")), + angebot_id=d.get("angebot_id") or "", + ) + + +def pfad_fuer(plz: str, *, basis_dir: Path | str | None = None, jetzt: datetime | None = None) -> Path: + """Dateipfad pro PLZ/Kalenderwoche: data/roh/{plz}_{jahr}-W{woche}.json. + + Gleiche Wochen-Logik wie der marktguru-Cache -- so gehört der Roh-Stand + erkennbar zur selben Woche wie die zugrundeliegende Quelle. + """ + basis = Path(basis_dir) if basis_dir else STANDARD_BASIS + jetzt = jetzt or datetime.now() + jahr, woche, _ = jetzt.isocalendar() + return basis / f"{plz}_{jahr}-W{woche:02d}.json" + + +def speichere_rohdaten( + fetch: FetchErgebnis, *, basis_dir: Path | str | None = None, jetzt: datetime | None = None +) -> Path: + """Persistiert das belegte Fetch-Ergebnis. Gibt den Schreibpfad zurück. + + KEIN LLM, kein Auffüllen: es wird exakt das geschrieben, was der Fetch belegt + hat. `abgerufen_am` der Meta ist der Schreibzeitpunkt; die einzelnen Angebote + tragen ihren eigenen, von der Quelle belegten Abrufzeitpunkt. + """ + import json + + jetzt = jetzt or datetime.now() + pfad = pfad_fuer(fetch.ort_plz, basis_dir=basis_dir, jetzt=jetzt) + pfad.parent.mkdir(parents=True, exist_ok=True) + inhalt = { + "ort_plz": fetch.ort_plz, + "ort_name": fetch.ort_name, + "abgerufen_am": jetzt.isoformat(), + "abgedeckte_quellen": list(fetch.abgedeckte_quellen), + "gesehene_haendler": list(fetch.gesehene_haendler), + "hinweise": list(fetch.hinweise), + "angebote": [_angebot_dict(a) for a in fetch.angebote], + } + pfad.write_text(json.dumps(inhalt, ensure_ascii=False, indent=2), encoding="utf-8") + return pfad + + +def lade_rohdaten( + plz: str, *, basis_dir: Path | str | None = None, jetzt: datetime | None = None +) -> FetchErgebnis | None: + """Lädt den gespeicherten Roh-Stand der aktuellen Woche -- oder None. + + None bedeutet: für diese PLZ/Woche liegen keine Rohdaten vor (Stufe 2 ist + dann gesperrt). Es wird NICHT gefetcht und nichts geraten. + """ + import json + + pfad = pfad_fuer(plz, basis_dir=basis_dir, jetzt=jetzt) + if not pfad.exists(): + return None + d = json.loads(pfad.read_text(encoding="utf-8")) + angebote = tuple(_angebot_aus_dict(a) for a in d.get("angebote", [])) + return FetchErgebnis( + ort_plz=d["ort_plz"], + ort_name=d.get("ort_name"), + angebote=angebote, + abgedeckte_quellen=tuple(d.get("abgedeckte_quellen", ())), + gesehene_haendler=tuple(d.get("gesehene_haendler", ())), + hinweise=tuple(d.get("hinweise", ())), + ) + + +def meta_fuer( + plz: str, *, basis_dir: Path | str | None = None, jetzt: datetime | None = None +) -> dict | None: + """Kurz-Zusammenfassung des Roh-Stands (für die UI), ohne die volle Liste. + + Gibt None zurück, wenn keine Rohdaten vorliegen. + """ + import json + + pfad = pfad_fuer(plz, basis_dir=basis_dir, jetzt=jetzt) + if not pfad.exists(): + return None + d = json.loads(pfad.read_text(encoding="utf-8")) + return { + "ort_plz": d["ort_plz"], + "ort_name": d.get("ort_name"), + "abgerufen_am": d.get("abgerufen_am"), + "anzahl": len(d.get("angebote", [])), + "haendler": list(d.get("gesehene_haendler", [])), + "quellen": list(d.get("abgedeckte_quellen", [])), + "hinweise": list(d.get("hinweise", [])), + } + + +def rohliste_dicts(fetch: FetchErgebnis) -> list[dict]: + """Belegte Rohliste als JSON-fähige dicts -- für die UI-Anzeige von Stufe 1. + + Bewusst OHNE Produktgruppe: Stufe 1 kategorisiert nicht. + """ + out: list[dict] = [] + for a in fetch.angebote: + out.append( + { + "titel": a.titel, + "marke": a.marke, + "preis": a.preis, + "grundpreis": a.grundpreis, + "menge": a.menge, + "haendler": a.haendler, + "gueltig_von": _iso(a.gueltig_von), + "gueltig_bis": _iso(a.gueltig_bis), + "quelle": a.quelle, + } + ) + return out diff --git a/src/angebote/web.py b/src/angebote/web.py index 45e1257..7029d74 100644 --- a/src/angebote/web.py +++ b/src/angebote/web.py @@ -1,9 +1,15 @@ """FastAPI-Web-UI -- dünne Schicht über den bestehenden Modulen. -Der Schnitt bleibt unangetastet: dieser Server ruft `fetch` (deterministisch) -und `kategorisieren` (LLM) auf, vermischt aber nichts. Die UI ist reine -Präsentation; alle harten Regeln (kein Auffüllen, nur Belegtes, Abbruch statt -Drift) leben weiter in den darunterliegenden Modulen. +Der Schnitt bleibt unangetastet und ist hier sogar im Endpoint-Schnitt sichtbar: + + * Stufe 1 -- /api/rohdaten -- ruft NUR den deterministischen Fetch und + persistiert die belegten Rohdaten. KEIN Key, KEIN LLM. + * Stufe 2 -- /api/kategorisieren -- liest die gespeicherten Rohdaten und + führt ausschließlich darauf die LLM-Kategorisierung aus. Sie fetcht NICHT + erneut und ist gesperrt, solange keine Rohdaten vorliegen. + +Die UI ist reine Präsentation; alle harten Regeln (kein Auffüllen, nur Belegtes, +Abbruch statt Drift) leben weiter in den darunterliegenden Modulen. Start: OPENROUTER_API_KEY=... uvicorn angebote.web:app --port 8000 @@ -25,11 +31,12 @@ app = FastAPI(title="Angebots-Übersicht") _HTML = (Path(__file__).parent / "web_static" / "index.html").read_text("utf-8") -# In-memory Job-Store. Schlicht gehalten -- ein lokales Single-User-Werkzeug. +# In-memory Job-Store für Stufe 2 (LLM, läuft im Thread). Schlicht gehalten -- +# ein lokales Single-User-Werkzeug. _jobs: dict[str, dict] = {} _jobs_lock = threading.Lock() -# Ergebnis-Cache: identischer Lauf (PLZ, Modell, no_llm) kommt sofort, ohne +# Ergebnis-Cache: identische Kategorisierung (PLZ, Modell) kommt sofort, ohne # erneute LLM-Calls. Im Geist des Projekt-Cachings. In-memory, pro Serverlauf. _ergebnis_cache: dict[tuple, dict] = {} @@ -55,17 +62,105 @@ def api_modelle(q: str = "") -> list[dict]: ] -@app.post("/api/lauf") -def api_lauf(req: dict) -> dict: +# === Stufe 1: Rohdaten holen & speichern (deterministisch, ohne Key) ========= + + +@app.post("/api/rohdaten") +def api_rohdaten_holen(req: dict) -> dict: + """Holt belegte Rohdaten (Fetch) und persistiert sie pro PLZ/Woche. + + KEIN LLM, KEIN Key. Bei Spezifitätsmangel / Datenlage-Bruch wird der + AbbruchFehler ehrlich als 422 mit Ursache/Vorschlag weitergereicht -- + es wird nichts aufgefüllt. + """ plz = (req.get("plz") or "").strip() if not plz: raise HTTPException(status_code=400, detail="PLZ fehlt") + + from .fetch import hole_angebote + from .speicher import meta_fuer, rohliste_dicts, speichere_rohdaten + + try: + fetch = hole_angebote(plz) # deterministisch; AbbruchFehler bei Regel 4 + except AbbruchFehler as e: + raise HTTPException(status_code=422, detail=e.als_text()) + except Exception as e: # nichts verstecken -- ehrliche Fehlermeldung + raise HTTPException(status_code=502, detail=f"Unerwarteter Fehler: {e}") + + speichere_rohdaten(fetch) + # Frisch kategorisierte Ergebnisse dieser PLZ verwerfen -- Roh-Stand neu. + for schluessel in [k for k in _ergebnis_cache if k[0] == plz]: + _ergebnis_cache.pop(schluessel, None) + + meta = meta_fuer(plz) or {} + return { + "plz": plz, + "ort_name": meta.get("ort_name"), + "anzahl": meta.get("anzahl", len(fetch.angebote)), + "haendler": meta.get("haendler", list(fetch.gesehene_haendler)), + "quellen": meta.get("quellen", list(fetch.abgedeckte_quellen)), + "abgerufen_am": meta.get("abgerufen_am"), + "hinweise": meta.get("hinweise", list(fetch.hinweise)), + "angebote": rohliste_dicts(fetch), + } + + +@app.get("/api/rohdaten/{plz}") +def api_rohdaten_laden(plz: str) -> dict: + """Liefert den gespeicherten Roh-Stand der aktuellen Woche oder 404.""" + from .speicher import lade_rohdaten, meta_fuer, rohliste_dicts + + meta = meta_fuer(plz) + if meta is None: + raise HTTPException( + status_code=404, detail=f"keine gespeicherten Rohdaten für PLZ {plz}" + ) + fetch = lade_rohdaten(plz) + return { + "plz": plz, + "ort_name": meta.get("ort_name"), + "anzahl": meta.get("anzahl", 0), + "haendler": meta.get("haendler", []), + "quellen": meta.get("quellen", []), + "abgerufen_am": meta.get("abgerufen_am"), + "hinweise": meta.get("hinweise", []), + "angebote": rohliste_dicts(fetch) if fetch else [], + } + + +# === Stufe 2: Kategorisieren (LLM) -- erst NACH vorhandenen Rohdaten ========= + + +@app.post("/api/kategorisieren") +def api_kategorisieren(req: dict) -> dict: + """Startet die LLM-Kategorisierung auf den GESPEICHERTEN Rohdaten. + + Gesperrt (400), solange keine Rohdaten zur PLZ vorliegen. Liest sie aus der + Persistenz -- fetcht NICHT erneut. Status-Polling über /api/lauf/{job_id}. + """ + plz = (req.get("plz") or "").strip() + if not plz: + raise HTTPException(status_code=400, detail="PLZ fehlt") + + from .speicher import lade_rohdaten + + fetch = lade_rohdaten(plz) + if fetch is None: + raise HTTPException( + status_code=400, + detail=( + f"Keine Rohdaten für PLZ {plz}. Zuerst Stufe 1 'Rohdaten holen' " + "ausführen -- die Kategorisierung arbeitet nur auf belegten Daten." + ), + ) + modell = req.get("modell") or None - no_llm = bool(req.get("no_llm")) + anbieter = req.get("anbieter") or "openrouter" + key = req.get("key") or None job_id = uuid.uuid4().hex[:12] # Cache-Treffer? Dann sofort als fertiger Job ausliefern, kein neuer Lauf. - treffer = _ergebnis_cache.get((plz, modell, no_llm)) + treffer = _ergebnis_cache.get((plz, modell)) if treffer is not None: with _jobs_lock: _jobs[job_id] = { @@ -77,22 +172,15 @@ def api_lauf(req: dict) -> dict: with _jobs_lock: _jobs[job_id] = { "status": "laufend", - "phase": "fetch", + "phase": "kategorisieren", "done": 0, "total": 0, "ergebnis": None, "fehler": None, } t = threading.Thread( - target=_run_job, - args=( - job_id, - plz, - modell, - req.get("anbieter") or "openrouter", - no_llm, - req.get("key") or None, - ), + target=_run_kategorisieren, + args=(job_id, plz, fetch, modell, anbieter, key), daemon=True, ) t.start() @@ -107,38 +195,28 @@ def api_status(job_id: str) -> dict: return job -def _run_job(job_id, plz, modell, anbieter, no_llm, key) -> None: +def _run_kategorisieren(job_id, plz, fetch, modell, anbieter, key) -> None: + """LLM-Schritt im Hintergrund. Arbeitet auf den geladenen Rohdaten. + + Verändert die Angebotsdaten nicht (sie sind frozen); übernimmt vom Modell + nur Gruppe + Unsicherheits-Flag. + """ job = _jobs[job_id] try: - from .fetch import hole_angebote - from .modell import KategorisiertesAngebot + from .kategorisieren import baue_kategorisierer, kategorisiere from .uebersicht import als_struktur - fetch = hole_angebote(plz) # deterministisch; AbbruchFehler bei Regel 4 + kt = baue_kategorisierer(anbieter, modell, api_key=key) - if no_llm: - # Ohne LLM: belegte Rohliste, sichtbar als unkategorisiert markiert. - from .config import FALLBACK_GRUPPE + def fort(done, total): + job["done"] = done + job["total"] = total - kat = [ - KategorisiertesAngebot(a, FALLBACK_GRUPPE, unsicher=True) - for a in fetch.angebote - ] - else: - from .kategorisieren import baue_kategorisierer, kategorisiere - - job["phase"] = "kategorisieren" - kt = baue_kategorisierer(anbieter, modell, api_key=key) - - def fort(done, total): - job["done"] = done - job["total"] = total - - kat = kategorisiere(list(fetch.angebote), kt, fortschritt=fort) + kat = kategorisiere(list(fetch.angebote), kt, fortschritt=fort) job["ergebnis"] = als_struktur(fetch, kat) job["status"] = "fertig" - _ergebnis_cache[(plz, modell, no_llm)] = job["ergebnis"] + _ergebnis_cache[(plz, modell)] = job["ergebnis"] except AbbruchFehler as e: job["status"] = "fehler" job["fehler"] = e.als_text() diff --git a/src/angebote/web_static/index.html b/src/angebote/web_static/index.html index 55c4270..96089c7 100644 --- a/src/angebote/web_static/index.html +++ b/src/angebote/web_static/index.html @@ -6,113 +6,297 @@ Angebots-Übersicht
-

Angebots-Übersicht

-

Ortskonkret, händlerübergreifend, nach Produktgruppen. Daten deterministisch aus marktguru, - Einordnung per LLM. Jedes Angebot ist belegt — kein Auffüllen, Unsicheres ist markiert.

+
+

Ortskonkret · händlerübergreifend · belegt

+

Angebots-Übersicht

+

Zwei strikt getrennte Stufen: zuerst die Rohdaten deterministisch holen + und speichern (kein LLM, kein Key), danach erst per LLM in Produktgruppen + einordnen. Jedes Angebot ist belegt — kein Auffüllen, Unsicheres ist markiert.

+
-
-
-
- - -
-
-
-
-
- -
- -
+
+ + +
+
+
1
+
+

Rohdaten holen & speichern

+

Deterministischer Abruf für eine PLZ. Wird pro PLZ/Woche auf Platte gespeichert.

+
+
deterministisch · kein LLM
+
+
+
+
+ + +
+
+ + +
+
+
+
+
+
+ + +
+ + + + OpenRouter-Konfiguration + gilt für Stufe 2 + +
+
+
+ + +
+
+ + +
+
+ + +
+
+
+
+ + +
+
+

Der Key wird ausschließlich für Stufe 2 verwendet und nie in Stufe 1 (Datenabruf) eingesetzt.

+
+
+ + +
+
+
2
+
+

Kategorisieren

+

Ordnet die gespeicherten Rohdaten per LLM in Produktgruppen. Verändert keine Angebotsdaten.

+
+
LLM-Schritt
+
+
+
+
+ + +
+
+
🔒 Erst aktiv, sobald für die PLZ Rohdaten gespeichert sind (Stufe 1).
+
+
+
+
-
- +