From 2029eb9fcff31070dec9544aafafe6d4afcac986 Mon Sep 17 00:00:00 2001 From: Jeuner <62662523+Jeuners@users.noreply.github.com> Date: Wed, 3 Jun 2026 18:15:14 +0200 Subject: [PATCH] Kategorisierung parallelisieren (bis zu 8 Batches gleichzeitig) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Die 77 LLM-Calls liefen bisher sequenziell -> bei langsamer Modell-Latenz minutenlang. Jetzt ThreadPoolExecutor (parallel=8); id-basiertes Mapping ist reihenfolge-unabhängig, Logik unverändert. Voller deepseek-Lauf: 162s statt sequenziell ~20min bei der heutigen Latenz (~16s/Call). Schnelle Modelle (gemini-flash) entsprechend ~15-20s. +1 Test (parallel ordnet alle Batches vollständig zu). 58 Tests grün. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/angebote/kategorisieren.py | 66 ++++++++++++++++-------- tests/test_kategorisieren_integritaet.py | 17 ++++++ 2 files changed, 61 insertions(+), 22 deletions(-) diff --git a/src/angebote/kategorisieren.py b/src/angebote/kategorisieren.py index e621965..d6a240d 100644 --- a/src/angebote/kategorisieren.py +++ b/src/angebote/kategorisieren.py @@ -39,40 +39,62 @@ def kategorisiere( *, batch_groesse: int = 25, fortschritt=None, + parallel: int = 8, ) -> list[KategorisiertesAngebot]: """Ordnet jedes Angebot einer Produktgruppe zu, ohne Daten zu verändern. `fortschritt`: optionaler Callback (erledigte_batches, gesamt_batches) -- - für Live-Anzeigen (z. B. die Web-UI). Ändert die Logik nicht. + für Live-Anzeigen (z. B. die Web-UI). + `parallel`: bis zu so viele Batches gleichzeitig (LLM-Calls sind I/O-bound). + Ändert die Logik nicht -- nur die Ausführungs-Reihenfolge. Die Zuordnung + erfolgt id-basiert, ist also reihenfolge-unabhängig. """ + import threading + original = {a.angebot_id: a for a in angebote} ergebnis: dict[str, KategorisiertesAngebot] = {} + lock = threading.Lock() - import math + batches = [ + angebote[start : start + batch_groesse] + for start in range(0, len(angebote), batch_groesse) + ] + gesamt_batches = max(1, len(batches)) + erledigt = [0] - gesamt_batches = max(1, math.ceil(len(angebote) / batch_groesse)) - for nr, start in enumerate(range(0, len(angebote), batch_groesse), 1): - batch = angebote[start : start + batch_groesse] + def verarbeite(batch): posten = [ - { - "id": a.angebot_id, - "titel": a.titel, - "marke": a.marke, - "menge": a.menge, - } + {"id": a.angebot_id, "titel": a.titel, "marke": a.marke, "menge": a.menge} for a in batch ] - antworten = kategorisierer.klassifiziere(posten) - for ant in antworten: - aid = ant.get("id") - if aid not in original or aid in ergebnis: - continue # fremde/duplizierte ID ignorieren - gruppe, unsicher = _bereinige_gruppe(ant.get("gruppe"), ant.get("unsicher")) - ergebnis[aid] = KategorisiertesAngebot( - angebot=original[aid], gruppe=gruppe, unsicher=unsicher - ) - if fortschritt is not None: - fortschritt(nr, gesamt_batches) + return kategorisierer.klassifiziere(posten) + + def uebernehmen(antworten): + with lock: + for ant in antworten: + aid = ant.get("id") + if aid not in original or aid in ergebnis: + continue # fremde/duplizierte ID ignorieren + gruppe, unsicher = _bereinige_gruppe( + ant.get("gruppe"), ant.get("unsicher") + ) + ergebnis[aid] = KategorisiertesAngebot( + angebot=original[aid], gruppe=gruppe, unsicher=unsicher + ) + erledigt[0] += 1 + if fortschritt is not None: + fortschritt(erledigt[0], gesamt_batches) + + if parallel > 1 and len(batches) > 1: + from concurrent.futures import ThreadPoolExecutor, as_completed + + with ThreadPoolExecutor(max_workers=parallel) as ex: + futures = [ex.submit(verarbeite, b) for b in batches] + for fut in as_completed(futures): + uebernehmen(fut.result()) # AbbruchFehler propagiert bewusst + else: + for batch in batches: + uebernehmen(verarbeite(batch)) # Posten, die das Modell nicht (gültig) beantwortet hat: ehrlich als # unsicher mit Fallback markieren -- nicht still einsortieren. diff --git a/tests/test_kategorisieren_integritaet.py b/tests/test_kategorisieren_integritaet.py index fcb444d..70f949b 100644 --- a/tests/test_kategorisieren_integritaet.py +++ b/tests/test_kategorisieren_integritaet.py @@ -35,6 +35,23 @@ def test_originaldaten_unveraendert_property(): assert nachher is vorher +def test_parallel_ordnet_alle_batches_vollstaendig_zu(): + # 60 Angebote -> 3 Batches -> parallel verarbeitet; nichts darf verloren gehen. + angebote = [beispiel_angebot(f"Artikel {i}", preis=float(i)) for i in range(60)] + fake = FakeKategorisierer( + lambda posten: [ + {"id": p["id"], "gruppe": "Sonstiges", "unsicher": False} for p in posten + ] + ) + ergebnis = kategorisiere(angebote, fake, batch_groesse=25, parallel=4) + assert len(ergebnis) == 60 + assert all(k.gruppe == "Sonstiges" and not k.unsicher for k in ergebnis) + # jedes Original-Angebot bleibt unverändert erhalten (id-basiert gemappt) + ids_in = {a.angebot_id for a in angebote} + ids_out = {k.angebot.angebot_id for k in ergebnis} + assert ids_in == ids_out + + def test_fehlender_preis_bleibt_fehlend(): angebot = beispiel_angebot("Hähnchen", preis=None) ergebnis = kategorisiere([angebot], _gib_gruppe("Fleisch & Wurst"))