Kategorisierung parallelisieren (bis zu 8 Batches gleichzeitig)

Die 77 LLM-Calls liefen bisher sequenziell -> bei langsamer Modell-Latenz
minutenlang. Jetzt ThreadPoolExecutor (parallel=8); id-basiertes Mapping ist
reihenfolge-unabhängig, Logik unverändert. Voller deepseek-Lauf: 162s statt
sequenziell ~20min bei der heutigen Latenz (~16s/Call). Schnelle Modelle
(gemini-flash) entsprechend ~15-20s. +1 Test (parallel ordnet alle Batches
vollständig zu). 58 Tests grün.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jeuner 2026-06-03 18:15:14 +02:00
parent aa60331f7f
commit 2029eb9fcf
2 changed files with 61 additions and 22 deletions

View file

@ -39,40 +39,62 @@ def kategorisiere(
*,
batch_groesse: int = 25,
fortschritt=None,
parallel: int = 8,
) -> list[KategorisiertesAngebot]:
"""Ordnet jedes Angebot einer Produktgruppe zu, ohne Daten zu verändern.
`fortschritt`: optionaler Callback (erledigte_batches, gesamt_batches) --
für Live-Anzeigen (z. B. die Web-UI). Ändert die Logik nicht.
für Live-Anzeigen (z. B. die Web-UI).
`parallel`: bis zu so viele Batches gleichzeitig (LLM-Calls sind I/O-bound).
Ändert die Logik nicht -- nur die Ausführungs-Reihenfolge. Die Zuordnung
erfolgt id-basiert, ist also reihenfolge-unabhängig.
"""
import threading
original = {a.angebot_id: a for a in angebote}
ergebnis: dict[str, KategorisiertesAngebot] = {}
lock = threading.Lock()
import math
batches = [
angebote[start : start + batch_groesse]
for start in range(0, len(angebote), batch_groesse)
]
gesamt_batches = max(1, len(batches))
erledigt = [0]
gesamt_batches = max(1, math.ceil(len(angebote) / batch_groesse))
for nr, start in enumerate(range(0, len(angebote), batch_groesse), 1):
batch = angebote[start : start + batch_groesse]
def verarbeite(batch):
posten = [
{
"id": a.angebot_id,
"titel": a.titel,
"marke": a.marke,
"menge": a.menge,
}
{"id": a.angebot_id, "titel": a.titel, "marke": a.marke, "menge": a.menge}
for a in batch
]
antworten = kategorisierer.klassifiziere(posten)
for ant in antworten:
aid = ant.get("id")
if aid not in original or aid in ergebnis:
continue # fremde/duplizierte ID ignorieren
gruppe, unsicher = _bereinige_gruppe(ant.get("gruppe"), ant.get("unsicher"))
ergebnis[aid] = KategorisiertesAngebot(
angebot=original[aid], gruppe=gruppe, unsicher=unsicher
)
if fortschritt is not None:
fortschritt(nr, gesamt_batches)
return kategorisierer.klassifiziere(posten)
def uebernehmen(antworten):
with lock:
for ant in antworten:
aid = ant.get("id")
if aid not in original or aid in ergebnis:
continue # fremde/duplizierte ID ignorieren
gruppe, unsicher = _bereinige_gruppe(
ant.get("gruppe"), ant.get("unsicher")
)
ergebnis[aid] = KategorisiertesAngebot(
angebot=original[aid], gruppe=gruppe, unsicher=unsicher
)
erledigt[0] += 1
if fortschritt is not None:
fortschritt(erledigt[0], gesamt_batches)
if parallel > 1 and len(batches) > 1:
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=parallel) as ex:
futures = [ex.submit(verarbeite, b) for b in batches]
for fut in as_completed(futures):
uebernehmen(fut.result()) # AbbruchFehler propagiert bewusst
else:
for batch in batches:
uebernehmen(verarbeite(batch))
# Posten, die das Modell nicht (gültig) beantwortet hat: ehrlich als
# unsicher mit Fallback markieren -- nicht still einsortieren.

View file

@ -35,6 +35,23 @@ def test_originaldaten_unveraendert_property():
assert nachher is vorher
def test_parallel_ordnet_alle_batches_vollstaendig_zu():
# 60 Angebote -> 3 Batches -> parallel verarbeitet; nichts darf verloren gehen.
angebote = [beispiel_angebot(f"Artikel {i}", preis=float(i)) for i in range(60)]
fake = FakeKategorisierer(
lambda posten: [
{"id": p["id"], "gruppe": "Sonstiges", "unsicher": False} for p in posten
]
)
ergebnis = kategorisiere(angebote, fake, batch_groesse=25, parallel=4)
assert len(ergebnis) == 60
assert all(k.gruppe == "Sonstiges" and not k.unsicher for k in ergebnis)
# jedes Original-Angebot bleibt unverändert erhalten (id-basiert gemappt)
ids_in = {a.angebot_id for a in angebote}
ids_out = {k.angebot.angebot_id for k in ergebnis}
assert ids_in == ids_out
def test_fehlender_preis_bleibt_fehlend():
angebot = beispiel_angebot("Hähnchen", preis=None)
ergebnis = kategorisiere([angebot], _gib_gruppe("Fleisch & Wurst"))