diff --git a/backend/app/services/interviews/delphi.py b/backend/app/services/interviews/delphi.py new file mode 100644 index 00000000..be455ae9 --- /dev/null +++ b/backend/app/services/interviews/delphi.py @@ -0,0 +1,195 @@ +from __future__ import annotations +import json +import statistics +from pathlib import Path +from typing import Optional +import yaml +from app.models.interview import ( + DelphiOpenResponse, DelphiRatingResponse, +) +from app.services.interviews.base import StakeholderInterviewer, PersonaRecord + + +class DelphiSubagent: + def __init__(self, llm, memory, instrument_path: Path, language: str = "de"): + with Path(instrument_path).open("r", encoding="utf-8") as f: + self.instrument = yaml.safe_load(f) + self.interviewer = StakeholderInterviewer(llm=llm, memory=memory, language=language) + self.llm = llm + self.language = language + + # --- Round 1: open questions --- + def _r1_schema(self) -> str: + return json.dumps({ + "answers": {q["question_id"]: "" for q in self.instrument["questions"]} + }, ensure_ascii=False) + + def _r1_prompt(self) -> str: + lines = ["Bitte beantworten Sie offen:" if self.language == "de" else "Please answer openly:"] + for q in self.instrument["questions"]: + txt = q["de"] if self.language == "de" else q["en"] + lines.append(f"[{q['question_id']}] {txt}") + return "\n".join(lines) + + def _r1_validate(self, raw: dict) -> Optional[dict]: + if not isinstance(raw, dict): return None + ans = raw.get("answers") + if not isinstance(ans, dict): return None + required = {q["question_id"] for q in self.instrument["questions"]} + if not required.issubset(ans.keys()): return None + return raw + + def administer_round1(self, persona: PersonaRecord) -> DelphiOpenResponse: + raw = self.interviewer.ask_in_character( + persona, user_prompt=self._r1_prompt(), + schema_hint=self._r1_schema(), validate=self._r1_validate, + ) + return DelphiOpenResponse(agent_id=persona.agent_id, round=1, + answers={k: str(v) for k, v in raw["answers"].items()}) + + # --- Round 2: rate themes --- + def _r2_schema(self, theme_ids: list[str]) -> str: + return json.dumps({ + "ratings": {tid: {"importance": "", "plausibility": ""} for tid in theme_ids} + }, ensure_ascii=False) + + def _r2_prompt(self, themes: list[dict]) -> str: + head = "Bewerten Sie jedes Thema nach Wichtigkeit (1-5) und Plausibilität (1-5):" if self.language == "de" \ + else "Rate each theme on importance (1-5) and plausibility (1-5):" + body = [f"- [{t['theme_id']}] {t['label']}" for t in themes] + return head + "\n" + "\n".join(body) + + def _r2_validate(self, theme_ids: list[str]): + def v(raw: dict) -> Optional[dict]: + if not isinstance(raw, dict): return None + ratings = raw.get("ratings", {}) + if set(ratings.keys()) != set(theme_ids): return None + for tid, r in ratings.items(): + if not isinstance(r, dict): return None + for key in ("importance", "plausibility"): + if not isinstance(r.get(key), int) or not 1 <= r[key] <= 5: return None + return raw + return v + + def administer_round2(self, persona: PersonaRecord, themes: list[dict]) -> DelphiRatingResponse: + theme_ids = [t["theme_id"] for t in themes] + raw = self.interviewer.ask_in_character( + persona, user_prompt=self._r2_prompt(themes), + schema_hint=self._r2_schema(theme_ids), validate=self._r2_validate(theme_ids), + ) + return DelphiRatingResponse(agent_id=persona.agent_id, round=2, + ratings={k: dict(v) for k, v in raw["ratings"].items()}) + + # --- Round 3: revise after seeing group stats --- + def administer_round3( + self, persona: PersonaRecord, themes: list[dict], group_stats: dict, own_r2: DelphiRatingResponse + ) -> DelphiRatingResponse: + theme_ids = [t["theme_id"] for t in themes] + head = ("Sie sehen unten die anonymisierten Gruppenwerte (Median, IQR). " + "Bitte überarbeiten Sie Ihre Bewertungen, wenn Sie möchten, und begründen Sie kurz.") \ + if self.language == "de" else \ + ("Below are the anonymised group values (median, IQR). " + "Please revise your ratings if you wish and add a short justification.") + ctx_lines = [] + for t in themes: + tid = t["theme_id"] + gs = group_stats.get(tid, {}) + own = own_r2.ratings.get(tid, {}) + ctx_lines.append( + f"[{tid}] {t['label']} — group importance median={gs.get('imp_median')}, " + f"IQR={gs.get('imp_iqr')}; plausibility median={gs.get('plaus_median')}, " + f"IQR={gs.get('plaus_iqr')}. Your R2: imp={own.get('importance')}, plaus={own.get('plausibility')}." + ) + prompt = head + "\n\n" + "\n".join(ctx_lines) + schema = json.dumps({ + "ratings": {tid: {"importance": "", "plausibility": ""} for tid in theme_ids}, + "justification": "", + }, ensure_ascii=False) + + def validate(raw): + if not isinstance(raw, dict): return None + ratings = raw.get("ratings", {}) + if set(ratings.keys()) != set(theme_ids): return None + for r in ratings.values(): + if not isinstance(r, dict): return None + for key in ("importance", "plausibility"): + if not isinstance(r.get(key), int) or not 1 <= r[key] <= 5: return None + return raw + + raw = self.interviewer.ask_in_character(persona, user_prompt=prompt, + schema_hint=schema, validate=validate) + return DelphiRatingResponse( + agent_id=persona.agent_id, round=3, + ratings={k: dict(v) for k, v in raw["ratings"].items()}, + justification=raw.get("justification"), + ) + + +def extract_themes(round1: list[DelphiOpenResponse], llm) -> list[dict]: + text_blocks = [] + for r in round1: + for qid, ans in r.answers.items(): + text_blocks.append(f"[agent {r.agent_id} {qid}] {ans}") + schema = json.dumps({"themes": [{"theme_id": "", "label": ""}]}, ensure_ascii=False) + messages = [ + {"role": "system", "content": + "You extract distinct thematic codes from open-ended German fisheries survey responses. " + f"Return JSON ONLY matching: {schema}. Use stable theme_ids of form theme_0, theme_1, …"}, + {"role": "user", "content": "Responses:\n" + "\n".join(text_blocks) + "\n\nReturn up to 12 distinct themes."}, + ] + raw = llm.chat_json(messages=messages, temperature=0.0) + themes = raw.get("themes", []) if isinstance(raw, dict) else [] + out = [] + for i, t in enumerate(themes): + if isinstance(t, dict) and "label" in t: + out.append({"theme_id": t.get("theme_id") or f"theme_{i}", "label": str(t["label"])}) + return out + + +def _iqr(xs: list[float]) -> float: + if not xs: return 0.0 + xs = sorted(xs) + q1 = statistics.quantiles(xs, n=4)[0] if len(xs) >= 4 else xs[0] + q3 = statistics.quantiles(xs, n=4)[2] if len(xs) >= 4 else xs[-1] + return q3 - q1 + + +def convergence_metrics(r2: list[DelphiRatingResponse], r3: list[DelphiRatingResponse]) -> dict: + by_r2 = {r.agent_id: r for r in r2} + by_r3 = {r.agent_id: r for r in r3} + themes: set[str] = set() + for r in r2 + r3: + themes.update(r.ratings.keys()) + out: dict[str, dict] = {} + for t in sorted(themes): + imp_r2 = [by_r2[a].ratings[t]["importance"] for a in by_r2 if t in by_r2[a].ratings] + imp_r3 = [by_r3[a].ratings[t]["importance"] for a in by_r3 if t in by_r3[a].ratings] + plaus_r2 = [by_r2[a].ratings[t]["plausibility"] for a in by_r2 if t in by_r2[a].ratings] + plaus_r3 = [by_r3[a].ratings[t]["plausibility"] for a in by_r3 if t in by_r3[a].ratings] + out[t] = { + "imp_median_r2": statistics.median(imp_r2) if imp_r2 else None, + "imp_median_r3": statistics.median(imp_r3) if imp_r3 else None, + "imp_iqr_r2": _iqr(imp_r2), + "imp_iqr_r3": _iqr(imp_r3), + "delta_iqr_importance": _iqr(imp_r3) - _iqr(imp_r2), + "plaus_iqr_r2": _iqr(plaus_r2), + "plaus_iqr_r3": _iqr(plaus_r3), + "delta_iqr_plausibility": _iqr(plaus_r3) - _iqr(plaus_r2), + } + return out + + +def group_stats_from_r2(r2: list[DelphiRatingResponse]) -> dict: + themes: set[str] = set() + for r in r2: themes.update(r.ratings.keys()) + stats: dict[str, dict] = {} + for t in themes: + imps = [r.ratings[t]["importance"] for r in r2 if t in r.ratings] + plauss = [r.ratings[t]["plausibility"] for r in r2 if t in r.ratings] + stats[t] = { + "imp_median": statistics.median(imps) if imps else None, + "imp_iqr": _iqr(imps), + "plaus_median": statistics.median(plauss) if plauss else None, + "plaus_iqr": _iqr(plauss), + } + return stats diff --git a/backend/scripts/instruments/delphi_v1.yaml b/backend/scripts/instruments/delphi_v1.yaml new file mode 100644 index 00000000..bb7650dc --- /dev/null +++ b/backend/scripts/instruments/delphi_v1.yaml @@ -0,0 +1,9 @@ +name: delphi_v1 +version: "1.0" +language_default: de +rounds: 3 +questions: + - {question_id: q1, de: "Welche drei Faktoren werden die deutsche Fischerei bis 2040 am stärksten prägen?", en: "Which three factors will most shape German fisheries by 2040?"} + - {question_id: q2, de: "Welche Akteurinnen und Akteure sind heute entscheidend, werden aber unterschätzt?", en: "Which actors are decisive today but underestimated?"} + - {question_id: q3, de: "Was sollte sich in den nächsten fünf Jahren ändern, damit die Fischerei eine Zukunft hat?", en: "What should change in the next five years for fisheries to have a future?"} + - {question_id: q4, de: "Welcher Trend macht Ihnen am meisten Hoffnung – und welcher am meisten Sorge?", en: "Which trend gives you most hope — and which most concern?"} diff --git a/backend/tests/interviews/test_delphi.py b/backend/tests/interviews/test_delphi.py new file mode 100644 index 00000000..c01ecfb8 --- /dev/null +++ b/backend/tests/interviews/test_delphi.py @@ -0,0 +1,58 @@ +from pathlib import Path +from app.services.interviews.base import PersonaRecord, MemoryDigest +from app.services.interviews.delphi import ( + DelphiSubagent, extract_themes, convergence_metrics, +) + +INSTRUMENT = Path(__file__).resolve().parents[2] / "scripts" / "instruments" / "delphi_v1.yaml" + +class _Mem: + def get_digest(self, agent_id, max_chars=2000): + return MemoryDigest(text="x", available=True) + +class _R1LLM: + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + return {"answers": { + "q1": "Klimawandel, Quoten, Generationswechsel", + "q2": "MSC, Aquakultur", + "q3": "Russland, EU-Politik", + "q4": "Verbraucherpreise", + }} + +class _R2LLM: + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + return {"ratings": {f"theme_{i}": {"importance": 4, "plausibility": 3} for i in range(5)}} + +class _ExtractLLM: + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + return {"themes": [ + {"theme_id": "theme_0", "label": "Klimawandel"}, + {"theme_id": "theme_1", "label": "Quoten"}, + {"theme_id": "theme_2", "label": "MSC"}, + {"theme_id": "theme_3", "label": "EU-Politik"}, + {"theme_id": "theme_4", "label": "Generationswechsel"}, + ]} + +def test_delphi_round1_open(): + sub = DelphiSubagent(llm=_R1LLM(), memory=_Mem(), instrument_path=INSTRUMENT) + persona = PersonaRecord(agent_id=2, name="A", persona="p") + resp = sub.administer_round1(persona) + assert resp.round == 1 + assert len(resp.answers) == 4 + +def test_extract_themes_aggregates(): + from app.models.interview import DelphiOpenResponse + r1 = [DelphiOpenResponse(agent_id=i, answers={"q1": "Klimawandel", "q2": "MSC"}) for i in range(3)] + themes = extract_themes(r1, llm=_ExtractLLM()) + assert len(themes) == 5 + assert all("theme_id" in t for t in themes) + +def test_convergence_metrics(): + from app.models.interview import DelphiRatingResponse + r2 = [DelphiRatingResponse(agent_id=i, round=2, + ratings={"t1": {"importance": 3, "plausibility": 3}}) for i in range(5)] + r3 = [DelphiRatingResponse(agent_id=i, round=3, + ratings={"t1": {"importance": 4, "plausibility": 4}}) for i in range(5)] + conv = convergence_metrics(r2, r3) + assert "t1" in conv + assert conv["t1"]["delta_iqr_importance"] is not None