diff --git a/docs/superpowers/plans/2026-05-23-stakeholder-interview-subagents.md b/docs/superpowers/plans/2026-05-23-stakeholder-interview-subagents.md new file mode 100644 index 00000000..4de7f7c6 --- /dev/null +++ b/docs/superpowers/plans/2026-05-23-stakeholder-interview-subagents.md @@ -0,0 +1,3837 @@ +# Stakeholder Interview Subagents Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build a four-subagent post-simulation interview system (Longitudinal, Diversity, Delphi, Scenario) over MiroFish-simulated stakeholders, plus a cross-method synthesiser, exposed via `/api/interview` and rendered in a new Vue Step4b. + +**Architecture:** Deterministic instrument runners (not ReACT). Shared `StakeholderInterviewer` base loads persona + Zep memory digest and administers per-instrument JSON-schema-validated prompts via the existing `LLMClient`. Four subagents own their own instrument YAML + output schema. `InterviewOrchestrator` fans out parallel post-sim execution; `InterviewSynthesizer` aggregates. Files: backend Python services + new Flask blueprint; frontend new Vue component with d3 viz. + +**Tech Stack:** Python 3.12, Flask, pydantic v2, PyYAML, scikit-learn (PCA, k-means), scipy (Wilcoxon), numpy, pytest; Vue 3, axios, d3 v7, vue-i18n. + +**Spec:** `docs/superpowers/specs/2026-05-23-stakeholder-interview-subagents-design.md` + +--- + +## Phase 0 — Setup + +### Task 0: Add deps and pytest scaffold + +**Files:** +- Modify: `backend/pyproject.toml` +- Create: `backend/tests/__init__.py` +- Create: `backend/tests/conftest.py` +- Create: `backend/pytest.ini` + +- [ ] **Step 1: Add deps to `backend/pyproject.toml`** + +In the `dependencies` array (after `pydantic>=2.0.0`), add: +```toml + "PyYAML>=6.0", + "scikit-learn>=1.4", + "scipy>=1.12", + "numpy>=1.26", + "pandas>=2.1", +``` + +- [ ] **Step 2: Create `backend/pytest.ini`** + +```ini +[pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +addopts = -ra --strict-markers +markers = + integration: marks integration tests (deselect with -m 'not integration') +``` + +- [ ] **Step 3: Create `backend/tests/__init__.py`** + +Empty file. + +- [ ] **Step 4: Create `backend/tests/conftest.py`** + +```python +import os +import sys +import pathlib +import pytest + +ROOT = pathlib.Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT)) + +os.environ.setdefault("LLM_API_KEY", "test") +os.environ.setdefault("LLM_BASE_URL", "https://example.invalid") +os.environ.setdefault("LLM_MODEL_NAME", "test-model") +os.environ.setdefault("ZEP_API_KEY", "test") + +@pytest.fixture +def tmp_uploads(tmp_path, monkeypatch): + monkeypatch.setenv("UPLOADS_DIR", str(tmp_path)) + return tmp_path +``` + +- [ ] **Step 5: Install + verify** + +Run: `cd backend && uv sync --python 3.12 && uv run pytest -q` +Expected: `0 tests collected` (no failures). Confirms infrastructure works. + +- [ ] **Step 6: Commit** + +```bash +git add backend/pyproject.toml backend/uv.lock backend/pytest.ini backend/tests/__init__.py backend/tests/conftest.py +git commit -m "chore(interviews): add deps and pytest scaffold for interview subsystem" +``` + +--- + +### Task 1: Add interview config keys + +**Files:** +- Modify: `backend/app/config.py` + +- [ ] **Step 1: Read current config** + +Open `backend/app/config.py` and locate the `Config` class. + +- [ ] **Step 2: Add config keys** + +Add inside the `Config` class (preserving existing keys): +```python + # Interview subsystem + INTERVIEW_MAX_TOKENS_PER_RUN = int(os.environ.get("INTERVIEW_MAX_TOKENS_PER_RUN", 15_000_000)) + INTERVIEW_MAX_WORKERS = int(os.environ.get("INTERVIEW_MAX_WORKERS", 8)) + INTERVIEW_DEFAULT_LANGUAGE = os.environ.get("INTERVIEW_DEFAULT_LANGUAGE", "de") + LLM_STUB_MODE = os.environ.get("LLM_STUB_MODE", "false").lower() == "true" +``` + +- [ ] **Step 3: Verify import** + +Run: `cd backend && uv run python -c "from app.config import Config; print(Config.INTERVIEW_MAX_WORKERS, Config.LLM_STUB_MODE)"` +Expected: `8 False` + +- [ ] **Step 4: Commit** + +```bash +git add backend/app/config.py +git commit -m "feat(interviews): add interview config keys (token budget, workers, language, stub mode)" +``` + +--- + +## Phase 1 — Foundation + +### Task 2: Pydantic models for instruments and responses + +**Files:** +- Create: `backend/app/models/interview.py` +- Create: `backend/tests/interviews/__init__.py` +- Test: `backend/tests/interviews/test_models.py` + +- [ ] **Step 1: Write failing test** + +Create `backend/tests/interviews/__init__.py` (empty), then `backend/tests/interviews/test_models.py`: +```python +import pytest +from pydantic import ValidationError +from app.models.interview import ( + LikertItem, LikertInstrument, LikertResponse, + InterviewPhase, SubagentKind, +) + +def test_likert_item_requires_de_and_en(): + item = LikertItem(item_id="x1", de="Frage", en="Question", scale=5) + assert item.scale == 5 + +def test_likert_item_rejects_bad_scale(): + with pytest.raises(ValidationError): + LikertItem(item_id="x1", de="d", en="e", scale=2) + +def test_likert_instrument_unique_item_ids(): + with pytest.raises(ValidationError): + LikertInstrument( + name="t", + items=[LikertItem(item_id="a", de="d", en="e", scale=5), + LikertItem(item_id="a", de="d", en="e", scale=5)], + ) + +def test_likert_response_validates_scale_range(): + with pytest.raises(ValidationError): + LikertResponse(agent_id=1, phase=InterviewPhase.T0, + responses={"a": 6}, confidence={"a": 0.5}) + +def test_subagent_kind_enum(): + assert SubagentKind.LONGITUDINAL.value == "longitudinal" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_models.py -v` +Expected: ImportError (module not yet created). + +- [ ] **Step 3: Create `backend/app/models/interview.py`** + +```python +from __future__ import annotations +from enum import Enum +from typing import Optional +from pydantic import BaseModel, Field, field_validator, model_validator + +class InterviewPhase(str, Enum): + T0 = "T0" + T1 = "T1" + +class SubagentKind(str, Enum): + LONGITUDINAL = "longitudinal" + DIVERSITY = "diversity" + DELPHI = "delphi" + SCENARIO = "scenario" + +class LikertItem(BaseModel): + item_id: str + de: str + en: str + scale: int = Field(ge=3, le=7) + family: Optional[str] = None + reverse_coded: bool = False + + @field_validator("scale") + @classmethod + def odd_scale(cls, v: int) -> int: + if v not in (3, 5, 7): + raise ValueError("scale must be 3, 5, or 7") + return v + +class LikertInstrument(BaseModel): + name: str + version: str = "1.0" + language_default: str = "de" + items: list[LikertItem] + + @model_validator(mode="after") + def unique_item_ids(self) -> "LikertInstrument": + ids = [i.item_id for i in self.items] + if len(set(ids)) != len(ids): + raise ValueError("duplicate item_id in instrument") + return self + +class LikertResponse(BaseModel): + agent_id: int + phase: InterviewPhase + responses: dict[str, int] + confidence: dict[str, float] = Field(default_factory=dict) + open_comment: Optional[str] = None + memory_available: bool = True + failed_items: list[str] = Field(default_factory=list) + + @model_validator(mode="after") + def values_in_range(self) -> "LikertResponse": + for k, v in self.responses.items(): + if not 1 <= v <= 7: + raise ValueError(f"response {k}={v} out of 1..7 range") + for k, v in self.confidence.items(): + if not 0.0 <= v <= 1.0: + raise ValueError(f"confidence {k}={v} out of 0..1 range") + return self + +class QSortStatement(BaseModel): + statement_id: str + de: str + en: str + +class QSortInstrument(BaseModel): + name: str + version: str = "1.0" + statements: list[QSortStatement] + distribution: list[int] # e.g. [2,3,4,6,4,3,2] for -3..+3 + +class QSortResponse(BaseModel): + agent_id: int + placements: dict[str, int] # statement_id -> bucket (-3..+3) + likert_axes: dict[str, int] # axis_id -> 1..7 + +class DelphiOpenResponse(BaseModel): + agent_id: int + round: int = 1 + answers: dict[str, str] # question_id -> free text + +class DelphiRatingResponse(BaseModel): + agent_id: int + round: int + ratings: dict[str, dict[str, int]] # theme_id -> {importance, plausibility} + justification: Optional[str] = None + +class ScenarioRating(BaseModel): + desirability: int = Field(ge=1, le=7) + plausibility: int = Field(ge=1, le=7) + impact_on_my_group: int = Field(ge=1, le=7) + fairness: int = Field(ge=1, le=7) + if_woke_up_response: str + +class ScenarioResponse(BaseModel): + agent_id: int + ratings: dict[str, ScenarioRating] # scenario_id -> rating +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_models.py -v` +Expected: 5 passed. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/models/interview.py backend/tests/interviews/__init__.py backend/tests/interviews/test_models.py +git commit -m "feat(interviews): add pydantic models for instruments and responses" +``` + +--- + +### Task 3: YAML instrument loader + validator + +**Files:** +- Create: `backend/app/services/interviews/__init__.py` +- Create: `backend/app/services/interviews/instrument_loader.py` +- Create: `backend/scripts/instruments/__init__.py` (empty marker so tests can import path) +- Test: `backend/tests/interviews/test_instrument_loader.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_instrument_loader.py +import pytest +from app.services.interviews.instrument_loader import ( + load_likert_instrument, InstrumentValidationError, +) + +def _write(tmp_path, text): + p = tmp_path / "inst.yaml" + p.write_text(text, encoding="utf-8") + return p + +def test_loads_valid_likert(tmp_path): + p = _write(tmp_path, """ +name: longitudinal_v1 +version: "1.0" +language_default: de +items: + - item_id: stk_1 + de: "Der westliche Dorschbestand wird sich erholen" + en: "Western cod stock will recover" + scale: 5 + family: stocks +""") + inst = load_likert_instrument(p) + assert inst.name == "longitudinal_v1" + assert len(inst.items) == 1 + +def test_rejects_duplicate_item_id(tmp_path): + p = _write(tmp_path, """ +name: x +items: + - {item_id: a, de: d, en: e, scale: 5} + - {item_id: a, de: d, en: e, scale: 5} +""") + with pytest.raises(InstrumentValidationError): + load_likert_instrument(p) + +def test_rejects_missing_required_field(tmp_path): + p = _write(tmp_path, """ +name: x +items: + - {item_id: a, de: d, scale: 5} +""") + with pytest.raises(InstrumentValidationError): + load_likert_instrument(p) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_instrument_loader.py -v` +Expected: ImportError. + +- [ ] **Step 3: Create loader** + +Create `backend/app/services/interviews/__init__.py` (empty), `backend/scripts/instruments/__init__.py` (empty), then `backend/app/services/interviews/instrument_loader.py`: + +```python +from __future__ import annotations +import hashlib +import json +from pathlib import Path +import yaml +from pydantic import ValidationError +from app.models.interview import ( + LikertInstrument, QSortInstrument, +) + +class InstrumentValidationError(ValueError): + pass + +def _parse_yaml(path: Path) -> dict: + if not path.exists(): + raise InstrumentValidationError(f"instrument file not found: {path}") + try: + with path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) + except yaml.YAMLError as e: + raise InstrumentValidationError(f"YAML parse error in {path}: {e}") from e + if not isinstance(data, dict): + raise InstrumentValidationError(f"top-level YAML must be a mapping in {path}") + return data + +def load_likert_instrument(path: Path) -> LikertInstrument: + data = _parse_yaml(Path(path)) + try: + return LikertInstrument(**data) + except ValidationError as e: + raise InstrumentValidationError(str(e)) from e + +def load_qsort_instrument(path: Path) -> QSortInstrument: + data = _parse_yaml(Path(path)) + try: + return QSortInstrument(**data) + except ValidationError as e: + raise InstrumentValidationError(str(e)) from e + +def instrument_hash(path: Path) -> str: + data = Path(path).read_bytes() + return hashlib.sha256(data).hexdigest()[:16] + +def freeze_snapshot(instruments: dict[str, Path], out_path: Path) -> dict: + snapshot = { + name: { + "path": str(p), + "hash": instrument_hash(p), + "content": _parse_yaml(p), + } + for name, p in instruments.items() + } + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(json.dumps(snapshot, ensure_ascii=False, indent=2), encoding="utf-8") + return snapshot +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_instrument_loader.py -v` +Expected: 3 passed. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/interviews/__init__.py backend/app/services/interviews/instrument_loader.py backend/scripts/instruments/__init__.py backend/tests/interviews/test_instrument_loader.py +git commit -m "feat(interviews): YAML instrument loader with pydantic validation and hash freezing" +``` + +--- + +### Task 4: LLM stub mode + +**Files:** +- Modify: `backend/app/utils/llm_client.py` +- Test: `backend/tests/interviews/test_llm_stub.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_llm_stub.py +import json +from app.utils.llm_client import LLMClient + +def test_stub_mode_returns_deterministic_canned_json(monkeypatch): + monkeypatch.setenv("LLM_STUB_MODE", "true") + from app.config import Config + Config.LLM_STUB_MODE = True + client = LLMClient(api_key="x", base_url="x", model="x") + messages = [ + {"role": "system", "content": "You are persona_42. Return JSON."}, + {"role": "user", "content": "stub_key=longitudinal:item_001"}, + ] + out1 = client.chat_json(messages=messages, temperature=0.0) + out2 = client.chat_json(messages=messages, temperature=0.0) + assert out1 == out2 + assert isinstance(out1, dict) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_llm_stub.py -v` +Expected: FAIL (real API call attempted or stub absent). + +- [ ] **Step 3: Read current `llm_client.py`** + +Read the file to locate `chat` and `chat_json` method bodies and where to insert the stub branch. + +- [ ] **Step 4: Add stub branch** + +At the top of `LLMClient.chat` (before the OpenAI call), insert: +```python + from app.config import Config + if getattr(Config, "LLM_STUB_MODE", False): + return self._stub_response(messages) +``` + +And at the top of `LLMClient.chat_json` (before delegating), insert the same guard returning a parsed dict via `self._stub_response_json(messages)`. + +Add these methods to `LLMClient`: +```python + def _stub_key(self, messages: list[dict]) -> str: + user_msg = next((m["content"] for m in reversed(messages) if m.get("role") == "user"), "") + sys_msg = next((m["content"] for m in messages if m.get("role") == "system"), "") + # Allow callers to embed an explicit stub_key=... token + for chunk in user_msg.split(): + if chunk.startswith("stub_key="): + return chunk[len("stub_key="):] + import hashlib + return hashlib.sha256((sys_msg + "|" + user_msg).encode("utf-8")).hexdigest()[:12] + + def _stub_response(self, messages: list[dict]) -> str: + import json as _json + return _json.dumps(self._stub_response_json(messages), ensure_ascii=False) + + def _stub_response_json(self, messages: list[dict]) -> dict: + key = self._stub_key(messages) + # Deterministic centered Likert + plausible open text + digit = sum(ord(c) for c in key) % 5 + 1 + return { + "stub_key": key, + "responses": {"item_001": digit, "item_002": digit, "item_003": (digit % 5) + 1}, + "confidence": {"item_001": 0.7, "item_002": 0.7, "item_003": 0.6}, + "open_comment": f"stub:{key}", + } +``` + +- [ ] **Step 5: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_llm_stub.py -v` +Expected: 1 passed. + +- [ ] **Step 6: Commit** + +```bash +git add backend/app/utils/llm_client.py backend/tests/interviews/test_llm_stub.py +git commit -m "feat(interviews): LLM stub mode for deterministic CI tests" +``` + +--- + +### Task 5: StakeholderInterviewer base class + +**Files:** +- Create: `backend/app/services/interviews/base.py` +- Test: `backend/tests/interviews/test_base_interviewer.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_base_interviewer.py +import json +import pytest +from app.services.interviews.base import StakeholderInterviewer, MemoryDigest, PersonaRecord + +class _FakeLLM: + def __init__(self, responses): + self.responses = list(responses) + self.calls = [] + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + self.calls.append(messages) + return self.responses.pop(0) + +class _FakeMemory: + def get_digest(self, agent_id, max_chars=2000): + return MemoryDigest(text=f"digest-for-{agent_id}", available=True) + +def test_in_character_prompt_includes_persona_and_memory(): + llm = _FakeLLM([{"x": 1}]) + mem = _FakeMemory() + interviewer = StakeholderInterviewer(llm=llm, memory=mem) + persona = PersonaRecord(agent_id=7, name="A", persona="I am a small-scale Baltic fisher.") + out = interviewer.ask_in_character(persona, user_prompt="Q?", schema_hint="{...}") + assert out == {"x": 1} + sys_msg = llm.calls[0][0]["content"] + assert "small-scale Baltic fisher" in sys_msg + assert "digest-for-7" in sys_msg + +def test_schema_retry_on_first_failure(): + bad_then_good = [{}, {"responses": {"a": 3}}] + llm = _FakeLLM(bad_then_good) + mem = _FakeMemory() + interviewer = StakeholderInterviewer(llm=llm, memory=mem) + def validator(d): + return d if "responses" in d else None + persona = PersonaRecord(agent_id=1, name="A", persona="p") + out = interviewer.ask_in_character(persona, user_prompt="Q?", schema_hint="x", validate=validator) + assert out == {"responses": {"a": 3}} + assert len(llm.calls) == 2 + +def test_two_failures_raise(): + llm = _FakeLLM([{}, {}]) + mem = _FakeMemory() + interviewer = StakeholderInterviewer(llm=llm, memory=mem) + persona = PersonaRecord(agent_id=1, name="A", persona="p") + with pytest.raises(ValueError): + interviewer.ask_in_character(persona, user_prompt="Q?", schema_hint="x", + validate=lambda d: d if "responses" in d else None) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_base_interviewer.py -v` +Expected: ImportError. + +- [ ] **Step 3: Implement base** + +`backend/app/services/interviews/base.py`: +```python +from __future__ import annotations +from dataclasses import dataclass, field +from typing import Any, Callable, Optional, Protocol + +@dataclass +class PersonaRecord: + agent_id: int + name: str + persona: str + profession: Optional[str] = None + bio: Optional[str] = None + +@dataclass +class MemoryDigest: + text: str + available: bool = True + +class MemoryProvider(Protocol): + def get_digest(self, agent_id: int, max_chars: int = 2000) -> MemoryDigest: ... + +class StakeholderInterviewer: + def __init__(self, llm, memory: MemoryProvider, language: str = "de"): + self.llm = llm + self.memory = memory + self.language = language + + def _system_prompt(self, persona: PersonaRecord, digest: MemoryDigest, schema_hint: str) -> str: + memory_block = digest.text if digest.available else "[no simulation memory available]" + lang_note = "Antworte ausschließlich auf Deutsch." if self.language == "de" else "Answer in English." + return ( + f"You are {persona.name}. {persona.persona}\n\n" + "You are answering a survey about the future of German fisheries. " + "Answer strictly in character based on your background, values, and what you experienced " + "during the simulated social media discourse summarised below.\n\n" + f"--- simulation memory digest ---\n{memory_block}\n--- end ---\n\n" + f"{lang_note} Return JSON ONLY matching this schema:\n{schema_hint}" + ) + + def ask_in_character( + self, + persona: PersonaRecord, + user_prompt: str, + schema_hint: str, + *, + temperature: float = 0.3, + max_tokens: Optional[int] = None, + validate: Optional[Callable[[dict], Optional[dict]]] = None, + ) -> dict: + digest = self.memory.get_digest(persona.agent_id) + messages = [ + {"role": "system", "content": self._system_prompt(persona, digest, schema_hint)}, + {"role": "user", "content": user_prompt}, + ] + out = self.llm.chat_json(messages=messages, temperature=temperature, max_tokens=max_tokens) + if validate is not None: + validated = validate(out) + if validated is not None: + return validated + messages.append({"role": "assistant", "content": str(out)}) + messages.append({"role": "user", "content": + "Your previous response did not match the required schema. " + f"Return ONLY valid JSON matching: {schema_hint}"}) + out = self.llm.chat_json(messages=messages, temperature=0.0, max_tokens=max_tokens) + validated = validate(out) + if validated is None: + raise ValueError(f"agent {persona.agent_id}: schema violation after retry") + return validated + return out +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_base_interviewer.py -v` +Expected: 3 passed. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/interviews/base.py backend/tests/interviews/test_base_interviewer.py +git commit -m "feat(interviews): StakeholderInterviewer base with in-character prompting and schema retry" +``` + +--- + +## Phase 2 — Subagents + +### Task 6: Longitudinal subagent + instrument YAML + +**Files:** +- Create: `backend/scripts/instruments/longitudinal_v1.yaml` +- Create: `backend/app/services/interviews/longitudinal.py` +- Test: `backend/tests/interviews/test_longitudinal.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_longitudinal.py +from pathlib import Path +import pytest +from app.models.interview import InterviewPhase +from app.services.interviews.base import PersonaRecord, MemoryDigest +from app.services.interviews.longitudinal import LongitudinalSubagent, run_aggregate + +class _FakeMem: + def get_digest(self, agent_id, max_chars=2000): + return MemoryDigest(text="x", available=True) + +class _CannedLLM: + def __init__(self): self.n = 0 + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + self.n += 1 + return { + "responses": {"stk_1": 4, "gov_1": 3, "mkt_1": 5, "clm_1": 2}, + "confidence": {"stk_1": 0.8, "gov_1": 0.6, "mkt_1": 0.7, "clm_1": 0.5}, + "open_comment": "test", + } + +INSTRUMENT = Path(__file__).resolve().parents[2] / "scripts" / "instruments" / "longitudinal_v1.yaml" + +def test_longitudinal_administer_one_agent(): + sub = LongitudinalSubagent(llm=_CannedLLM(), memory=_FakeMem(), instrument_path=INSTRUMENT) + persona = PersonaRecord(agent_id=3, name="A", persona="p") + resp = sub.administer(persona, phase=InterviewPhase.T0) + assert resp.agent_id == 3 + assert resp.phase == InterviewPhase.T0 + assert set(resp.responses.keys()) >= {"stk_1", "gov_1", "mkt_1", "clm_1"} + +def test_longitudinal_aggregate_delta(): + from app.models.interview import LikertResponse + t0 = [LikertResponse(agent_id=i, phase=InterviewPhase.T0, + responses={"stk_1": 3, "gov_1": 4}, + confidence={"stk_1": 0.8, "gov_1": 0.8}) for i in range(5)] + t1 = [LikertResponse(agent_id=i, phase=InterviewPhase.T1, + responses={"stk_1": 4, "gov_1": 4}, + confidence={"stk_1": 0.8, "gov_1": 0.8}) for i in range(5)] + agg = run_aggregate(t0, t1) + assert agg["per_item"]["stk_1"]["mean_delta"] == 1.0 + assert agg["per_item"]["gov_1"]["mean_delta"] == 0.0 + assert agg["n_paired"] == 5 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_longitudinal.py -v` +Expected: ImportError + missing YAML file. + +- [ ] **Step 3: Create instrument YAML** + +`backend/scripts/instruments/longitudinal_v1.yaml`: +```yaml +name: longitudinal_v1 +version: "1.0" +language_default: de +items: + # Stock status & recovery + - {item_id: stk_1, family: stocks, scale: 5, + de: "Der westliche Dorschbestand wird sich bis 2035 erholen.", + en: "The Western Baltic cod stock will recover by 2035."} + - {item_id: stk_2, family: stocks, scale: 5, + de: "Der Heringsbestand in der westlichen Ostsee ist nicht mehr zu retten.", + en: "The Western Baltic herring stock can no longer be saved.", + reverse_coded: true} + - {item_id: stk_3, family: stocks, scale: 5, + de: "Wissenschaftliche Bestandsschätzungen sind generell zuverlässig.", + en: "Scientific stock assessments are generally reliable."} + # Governance & CFP + - {item_id: gov_1, family: governance, scale: 5, + de: "Die Gemeinsame Fischereipolitik der EU scheitert beim Schutz der Ostseefische.", + en: "The EU Common Fisheries Policy fails to protect Baltic fish.", + reverse_coded: true} + - {item_id: gov_2, family: governance, scale: 5, + de: "Entscheidungen über Fangquoten sollten stärker lokal getroffen werden.", + en: "Decisions on catch quotas should be taken more locally."} + - {item_id: gov_3, family: governance, scale: 5, + de: "Die deutsche Bundesregierung handelt entschlossen bei Fischereifragen.", + en: "The German federal government acts decisively on fisheries issues."} + # Market & MSC + - {item_id: mkt_1, family: market, scale: 5, + de: "Nur MSC-zertifizierter Fisch sollte verkauft werden dürfen.", + en: "Only MSC-certified fish should be allowed for sale."} + - {item_id: mkt_2, family: market, scale: 5, + de: "Importierter Fisch verdrängt die deutsche Kleinfischerei.", + en: "Imported fish displaces German small-scale fisheries."} + - {item_id: mkt_3, family: market, scale: 5, + de: "Verbraucher zahlen gerne mehr für nachhaltigen Ostseefisch.", + en: "Consumers gladly pay more for sustainable Baltic fish."} + # Climate & adaptation + - {item_id: clm_1, family: climate, scale: 5, + de: "Der Klimawandel macht traditionelle Ostseefischerei unmöglich.", + en: "Climate change makes traditional Baltic fisheries impossible.", + reverse_coded: true} + - {item_id: clm_2, family: climate, scale: 5, + de: "Aquakultur ist die Zukunft der deutschen Fischwirtschaft.", + en: "Aquaculture is the future of the German fishing industry."} + - {item_id: clm_3, family: climate, scale: 5, + de: "Die Fischerei muss sich grundlegend an neue Arten anpassen.", + en: "Fisheries must fundamentally adapt to new species."} +``` + +- [ ] **Step 4: Implement subagent** + +`backend/app/services/interviews/longitudinal.py`: +```python +from __future__ import annotations +import json +import math +from pathlib import Path +from typing import Optional +from app.models.interview import ( + LikertInstrument, LikertResponse, InterviewPhase, +) +from app.services.interviews.base import StakeholderInterviewer, PersonaRecord +from app.services.interviews.instrument_loader import load_likert_instrument + +class LongitudinalSubagent: + def __init__(self, llm, memory, instrument_path: Path, language: str = "de"): + self.instrument: LikertInstrument = load_likert_instrument(Path(instrument_path)) + self.interviewer = StakeholderInterviewer(llm=llm, memory=memory, language=language) + self.language = language + + def _schema_hint(self) -> str: + ids = [i.item_id for i in self.instrument.items] + return json.dumps({ + "responses": {k: "" for k in ids}, + "confidence": {k: "" for k in ids}, + "open_comment": "", + }, ensure_ascii=False) + + def _user_prompt(self) -> str: + lines = ["Bitte bewerten Sie die folgenden Aussagen auf einer Skala von 1 (lehne stark ab) bis 5 (stimme stark zu)." if self.language == "de" + else "Please rate the following statements on a scale from 1 (strongly disagree) to 5 (strongly agree)."] + for it in self.instrument.items: + txt = it.de if self.language == "de" else it.en + lines.append(f"- [{it.item_id}] {txt}") + return "\n".join(lines) + + def _validator(self, raw: dict) -> Optional[dict]: + if not isinstance(raw, dict): return None + resp = raw.get("responses") + if not isinstance(resp, dict): return None + required = {it.item_id for it in self.instrument.items} + if not required.issubset(resp.keys()): return None + for k, v in resp.items(): + if not isinstance(v, int) or not 1 <= v <= 5: return None + return raw + + def administer(self, persona: PersonaRecord, phase: InterviewPhase) -> LikertResponse: + raw = self.interviewer.ask_in_character( + persona, + user_prompt=self._user_prompt(), + schema_hint=self._schema_hint(), + validate=self._validator, + ) + return LikertResponse( + agent_id=persona.agent_id, + phase=phase, + responses={k: int(v) for k, v in raw["responses"].items()}, + confidence={k: float(v) for k, v in raw.get("confidence", {}).items()}, + open_comment=raw.get("open_comment"), + ) + +def run_aggregate(t0: list[LikertResponse], t1: list[LikertResponse]) -> dict: + by_t0 = {r.agent_id: r for r in t0} + by_t1 = {r.agent_id: r for r in t1} + paired = sorted(set(by_t0) & set(by_t1)) + items: set[str] = set() + for r in t0 + t1: + items.update(r.responses.keys()) + per_item: dict[str, dict] = {} + for it in sorted(items): + deltas = [] + for aid in paired: + v0 = by_t0[aid].responses.get(it) + v1 = by_t1[aid].responses.get(it) + if v0 is None or v1 is None: continue + deltas.append(v1 - v0) + if not deltas: + per_item[it] = {"mean_delta": None, "n": 0} + continue + m = sum(deltas) / len(deltas) + var = sum((d - m) ** 2 for d in deltas) / max(len(deltas) - 1, 1) + per_item[it] = { + "mean_delta": m, + "sd_delta": math.sqrt(var), + "n": len(deltas), + "n_positive": sum(1 for d in deltas if d > 0), + "n_negative": sum(1 for d in deltas if d < 0), + } + per_agent: dict[int, dict] = {} + for aid in paired: + r0 = by_t0[aid].responses + r1 = by_t1[aid].responses + common = set(r0) & set(r1) + total = sum(abs(r1[k] - r0[k]) for k in common) + per_agent[aid] = {"total_abs_drift": total, "n_items": len(common)} + return { + "n_paired": len(paired), + "n_t0_only": len(set(by_t0) - set(by_t1)), + "n_t1_only": len(set(by_t1) - set(by_t0)), + "per_item": per_item, + "per_agent": per_agent, + } +``` + +- [ ] **Step 5: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_longitudinal.py -v` +Expected: 2 passed. + +- [ ] **Step 6: Commit** + +```bash +git add backend/scripts/instruments/longitudinal_v1.yaml backend/app/services/interviews/longitudinal.py backend/tests/interviews/test_longitudinal.py +git commit -m "feat(interviews): longitudinal subagent + 12-item Likert instrument" +``` + +--- + +### Task 7: Diversity subagent + Q-sort instrument + +**Files:** +- Create: `backend/scripts/instruments/diversity_v1.yaml` +- Create: `backend/app/services/interviews/diversity.py` +- Test: `backend/tests/interviews/test_diversity.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_diversity.py +from pathlib import Path +import numpy as np +from app.services.interviews.base import PersonaRecord, MemoryDigest +from app.services.interviews.diversity import ( + DiversitySubagent, run_typology, +) + +class _Mem: + def get_digest(self, agent_id, max_chars=2000): + return MemoryDigest(text="x", available=True) + +class _CannedLLM: + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + # Place all 24 statements into legal buckets per the forced distribution + placements = {} + buckets = [-3]*2 + [-2]*3 + [-1]*4 + [0]*6 + [1]*4 + [2]*3 + [3]*2 + for i in range(24): + placements[f"st_{i+1:02d}"] = buckets[i] + return { + "placements": placements, + "likert_axes": {"ax_pres_extr": 5, "ax_loc_eu": 3, "ax_sci_trad": 4, + "ax_ind_col": 4, "ax_short_long": 5, "ax_mkt_reg": 3}, + } + +INSTRUMENT = Path(__file__).resolve().parents[2] / "scripts" / "instruments" / "diversity_v1.yaml" + +def test_diversity_administer(): + sub = DiversitySubagent(llm=_CannedLLM(), memory=_Mem(), instrument_path=INSTRUMENT) + persona = PersonaRecord(agent_id=1, name="A", persona="p") + resp = sub.administer(persona) + assert len(resp.placements) == 24 + assert set(resp.likert_axes.keys()) == { + "ax_pres_extr","ax_loc_eu","ax_sci_trad","ax_ind_col","ax_short_long","ax_mkt_reg" + } + +def test_typology_runs_pca_kmeans(): + from app.models.interview import QSortResponse + rng = np.random.default_rng(42) + responses = [] + for aid in range(20): + placements = {f"st_{i+1:02d}": int(rng.integers(-3, 4)) for i in range(24)} + axes = {f"ax_{j}": int(rng.integers(1, 8)) for j in range(6)} + responses.append(QSortResponse(agent_id=aid, placements=placements, likert_axes=axes)) + result = run_typology(responses, n_clusters=3) + assert "clusters" in result + assert len(result["clusters"]) == 3 + assert "pca" in result + assert len(result["pca"]["components"]) >= 2 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_diversity.py -v` +Expected: ImportError. + +- [ ] **Step 3: Create instrument YAML** + +`backend/scripts/instruments/diversity_v1.yaml`: +```yaml +name: diversity_v1 +version: "1.0" +language_default: de +distribution: [2, 3, 4, 6, 4, 3, 2] # buckets from -3 to +3, total 24 +statements: + - {statement_id: st_01, de: "Die Ostsee gehört den Fischern, die hier seit Generationen leben.", en: "The Baltic belongs to fishers who have lived here for generations."} + - {statement_id: st_02, de: "MSC-Zertifizierung schützt vor allem große Konzerne.", en: "MSC certification mainly protects large corporations."} + - {statement_id: st_03, de: "Wissenschaftliche Quoten sind die einzige Grundlage für Politik.", en: "Scientific quotas are the only legitimate basis for policy."} + - {statement_id: st_04, de: "Aquakultur kann Ostseefischerei ersetzen.", en: "Aquaculture can replace Baltic fisheries."} + - {statement_id: st_05, de: "Sportfischer schaden den Beständen mehr als die Berufsfischer.", en: "Recreational anglers harm stocks more than commercial fishers."} + - {statement_id: st_06, de: "Die EU-Fischereipolitik kennt die Ostsee nicht.", en: "EU fisheries policy doesn't understand the Baltic."} + - {statement_id: st_07, de: "Großtechnische Fischerei ist effizienter und damit nachhaltiger.", en: "Industrial fisheries are more efficient and therefore more sustainable."} + - {statement_id: st_08, de: "Wer Fisch isst, sollte mehr dafür bezahlen.", en: "Those who eat fish should pay more for it."} + - {statement_id: st_09, de: "Die Kleinfischerei muss subventioniert werden.", en: "Small-scale fisheries must be subsidised."} + - {statement_id: st_10, de: "Marine Schutzgebiete sind reine Symbolpolitik.", en: "Marine protected areas are mere symbolism."} + - {statement_id: st_11, de: "Russlands Krieg ändert alles in der Ostsee.", en: "Russia's war changes everything in the Baltic."} + - {statement_id: st_12, de: "Nur drastische Reduktion der Fangmengen rettet die Bestände.", en: "Only drastic catch reductions will save the stocks."} + - {statement_id: st_13, de: "NGOs übertreiben die Krise systematisch.", en: "NGOs systematically exaggerate the crisis."} + - {statement_id: st_14, de: "Klimawandel ist das eigentliche Problem, nicht die Fischerei.", en: "Climate change is the real problem, not fisheries."} + - {statement_id: st_15, de: "Tradition zählt mehr als kurzfristige Bestandszahlen.", en: "Tradition matters more than short-term stock numbers."} + - {statement_id: st_16, de: "Verbraucher entscheiden über die Zukunft des Fisches.", en: "Consumers decide the future of fish."} + - {statement_id: st_17, de: "Ohne Generalstreik der Fischer ändert sich nichts.", en: "Without a fishers' general strike, nothing will change."} + - {statement_id: st_18, de: "Die Bundesregierung sollte Kutter aufkaufen und stilllegen.", en: "The federal government should buy out and decommission boats."} + - {statement_id: st_19, de: "Die Dorschkrise ist Folge gescheiterter Politik.", en: "The cod crisis is the result of policy failure."} + - {statement_id: st_20, de: "Ostsee-Aquakultur ist ökologisch problematisch.", en: "Baltic aquaculture is ecologically problematic."} + - {statement_id: st_21, de: "Junge Menschen werden keinen Fischereibetrieb mehr übernehmen.", en: "Young people will no longer take over fishing businesses."} + - {statement_id: st_22, de: "Markt regelt sich selbst, auch beim Fisch.", en: "The market regulates itself, also for fish."} + - {statement_id: st_23, de: "Lokale Genossenschaften sind die Lösung.", en: "Local cooperatives are the solution."} + - {statement_id: st_24, de: "In 20 Jahren gibt es keine deutsche Ostseefischerei mehr.", en: "In 20 years there will be no German Baltic fisheries left."} +likert_axes: + - {axis_id: ax_pres_extr, scale: 7, de: "Bewahrung (1) vs. Nutzung (7)", en: "Preservation (1) vs. Extraction (7)"} + - {axis_id: ax_loc_eu, scale: 7, de: "Lokal (1) vs. EU-zentral (7)", en: "Local (1) vs. EU-central (7)"} + - {axis_id: ax_sci_trad, scale: 7, de: "Wissenschaft (1) vs. Tradition (7)", en: "Science-led (1) vs. Tradition-led (7)"} + - {axis_id: ax_ind_col, scale: 7, de: "Individuum (1) vs. Kollektiv (7)", en: "Individual (1) vs. Collective (7)"} + - {axis_id: ax_short_long,scale: 7, de: "Kurzfristig (1) vs. Langfristig (7)", en: "Short-term (1) vs. Long-term (7)"} + - {axis_id: ax_mkt_reg, scale: 7, de: "Markt (1) vs. Regulierung (7)", en: "Market (1) vs. Regulation (7)"} +``` + +- [ ] **Step 4: Implement subagent** + +`backend/app/services/interviews/diversity.py`: +```python +from __future__ import annotations +import json +from pathlib import Path +from typing import Optional +import numpy as np +from sklearn.decomposition import PCA +from sklearn.cluster import KMeans +import yaml +from app.models.interview import QSortResponse +from app.services.interviews.base import StakeholderInterviewer, PersonaRecord +from app.services.interviews.instrument_loader import InstrumentValidationError + +class DiversitySubagent: + def __init__(self, llm, memory, instrument_path: Path, language: str = "de"): + self.instrument = self._load(Path(instrument_path)) + self.interviewer = StakeholderInterviewer(llm=llm, memory=memory, language=language) + self.language = language + + def _load(self, path: Path) -> dict: + with path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) + if not isinstance(data, dict) or "statements" not in data or "distribution" not in data: + raise InstrumentValidationError(f"invalid diversity instrument: {path}") + if sum(data["distribution"]) != len(data["statements"]): + raise InstrumentValidationError("distribution sum must equal number of statements") + return data + + def _schema_hint(self) -> str: + return json.dumps({ + "placements": {s["statement_id"]: "" for s in self.instrument["statements"]}, + "likert_axes": {a["axis_id"]: "" for a in self.instrument["likert_axes"]}, + }, ensure_ascii=False) + + def _user_prompt(self) -> str: + dist = self.instrument["distribution"] + buckets = list(range(-3, 4)) + bucket_desc = ", ".join(f"{b}:{n}" for b, n in zip(buckets, dist)) + lines = [ + ("Ordnen Sie jede Aussage genau einer Box von -3 (lehne stark ab) bis +3 (stimme stark zu) zu. " + f"Die Verteilung ist erzwungen: {bucket_desc}.") if self.language == "de" else + ("Place every statement into exactly one box from -3 (strongly disagree) to +3 (strongly agree). " + f"The distribution is forced: {bucket_desc}."), + "", + "Statements:", + ] + for s in self.instrument["statements"]: + txt = s["de"] if self.language == "de" else s["en"] + lines.append(f"- [{s['statement_id']}] {txt}") + lines += ["", "Then rate each axis from 1 to 7:"] + for a in self.instrument["likert_axes"]: + txt = a["de"] if self.language == "de" else a["en"] + lines.append(f"- [{a['axis_id']}] {txt}") + return "\n".join(lines) + + def _validator(self, raw: dict) -> Optional[dict]: + if not isinstance(raw, dict): return None + placements = raw.get("placements", {}) + axes = raw.get("likert_axes", {}) + statements = {s["statement_id"] for s in self.instrument["statements"]} + if set(placements.keys()) != statements: return None + dist = self.instrument["distribution"] + target = {b: n for b, n in zip(range(-3, 4), dist)} + got: dict[int, int] = {} + for v in placements.values(): + if not isinstance(v, int) or not -3 <= v <= 3: return None + got[v] = got.get(v, 0) + 1 + if got != target: return None + for a in self.instrument["likert_axes"]: + v = axes.get(a["axis_id"]) + if not isinstance(v, int) or not 1 <= v <= 7: return None + return raw + + def administer(self, persona: PersonaRecord) -> QSortResponse: + raw = self.interviewer.ask_in_character( + persona, + user_prompt=self._user_prompt(), + schema_hint=self._schema_hint(), + validate=self._validator, + ) + return QSortResponse( + agent_id=persona.agent_id, + placements={k: int(v) for k, v in raw["placements"].items()}, + likert_axes={k: int(v) for k, v in raw["likert_axes"].items()}, + ) + +def _vectorize(r: QSortResponse, statements: list[str], axes: list[str]) -> np.ndarray: + return np.array( + [r.placements.get(s, 0) for s in statements] + + [r.likert_axes.get(a, 4) for a in axes], + dtype=float, + ) + +def run_typology(responses: list[QSortResponse], n_clusters: int = 4) -> dict: + if not responses: + return {"n": 0, "clusters": [], "pca": {"components": [], "explained_variance": []}} + statements = sorted({k for r in responses for k in r.placements}) + axes = sorted({k for r in responses for k in r.likert_axes}) + X = np.vstack([_vectorize(r, statements, axes) for r in responses]) + n_clusters = min(n_clusters, len(responses)) + pca = PCA(n_components=min(5, X.shape[1], X.shape[0])) + pcs = pca.fit_transform(X) + km = KMeans(n_clusters=n_clusters, n_init=10, random_state=0) + labels = km.fit_predict(X) + clusters = [] + for c in range(n_clusters): + members = [responses[i].agent_id for i in range(len(responses)) if labels[i] == c] + centroid = km.cluster_centers_[c] + clusters.append({ + "cluster_id": int(c), + "n": len(members), + "agent_ids": members, + "top_loadings": { + statements[i] if i < len(statements) else axes[i - len(statements)]: float(centroid[i]) + for i in np.argsort(np.abs(centroid))[::-1][:8].tolist() + }, + }) + return { + "n": len(responses), + "clusters": clusters, + "pca": { + "components": pcs.tolist(), + "explained_variance": pca.explained_variance_ratio_.tolist(), + "agent_ids": [r.agent_id for r in responses], + }, + } +``` + +- [ ] **Step 5: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_diversity.py -v` +Expected: 2 passed. + +- [ ] **Step 6: Commit** + +```bash +git add backend/scripts/instruments/diversity_v1.yaml backend/app/services/interviews/diversity.py backend/tests/interviews/test_diversity.py +git commit -m "feat(interviews): diversity subagent with Q-sort + 6 Likert axes + PCA/k-means typology" +``` + +--- + +### Task 8: Delphi subagent (three rounds) + +**Files:** +- Create: `backend/scripts/instruments/delphi_v1.yaml` +- Create: `backend/app/services/interviews/delphi.py` +- Test: `backend/tests/interviews/test_delphi.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_delphi.py +from pathlib import Path +from app.services.interviews.base import PersonaRecord, MemoryDigest +from app.services.interviews.delphi import ( + DelphiSubagent, extract_themes, convergence_metrics, +) + +INSTRUMENT = Path(__file__).resolve().parents[2] / "scripts" / "instruments" / "delphi_v1.yaml" + +class _Mem: + def get_digest(self, agent_id, max_chars=2000): + return MemoryDigest(text="x", available=True) + +class _R1LLM: + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + return {"answers": { + "q1": "Klimawandel, Quoten, Generationswechsel", + "q2": "MSC, Aquakultur", + "q3": "Russland, EU-Politik", + "q4": "Verbraucherpreise", + }} + +class _R2LLM: + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + return {"ratings": {f"theme_{i}": {"importance": 4, "plausibility": 3} for i in range(5)}} + +class _ExtractLLM: + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + return {"themes": [ + {"theme_id": "theme_0", "label": "Klimawandel"}, + {"theme_id": "theme_1", "label": "Quoten"}, + {"theme_id": "theme_2", "label": "MSC"}, + {"theme_id": "theme_3", "label": "EU-Politik"}, + {"theme_id": "theme_4", "label": "Generationswechsel"}, + ]} + +def test_delphi_round1_open(): + sub = DelphiSubagent(llm=_R1LLM(), memory=_Mem(), instrument_path=INSTRUMENT) + persona = PersonaRecord(agent_id=2, name="A", persona="p") + resp = sub.administer_round1(persona) + assert resp.round == 1 + assert len(resp.answers) == 4 + +def test_extract_themes_aggregates(): + from app.models.interview import DelphiOpenResponse + r1 = [DelphiOpenResponse(agent_id=i, answers={"q1": "Klimawandel", "q2": "MSC"}) for i in range(3)] + themes = extract_themes(r1, llm=_ExtractLLM()) + assert len(themes) == 5 + assert all("theme_id" in t for t in themes) + +def test_convergence_metrics(): + from app.models.interview import DelphiRatingResponse + r2 = [DelphiRatingResponse(agent_id=i, round=2, + ratings={"t1": {"importance": 3, "plausibility": 3}}) for i in range(5)] + r3 = [DelphiRatingResponse(agent_id=i, round=3, + ratings={"t1": {"importance": 4, "plausibility": 4}}) for i in range(5)] + conv = convergence_metrics(r2, r3) + assert "t1" in conv + assert conv["t1"]["delta_iqr_importance"] is not None +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_delphi.py -v` +Expected: ImportError. + +- [ ] **Step 3: Create instrument YAML** + +`backend/scripts/instruments/delphi_v1.yaml`: +```yaml +name: delphi_v1 +version: "1.0" +language_default: de +rounds: 3 +questions: + - {question_id: q1, de: "Welche drei Faktoren werden die deutsche Fischerei bis 2040 am stärksten prägen?", en: "Which three factors will most shape German fisheries by 2040?"} + - {question_id: q2, de: "Welche Akteurinnen und Akteure sind heute entscheidend, werden aber unterschätzt?", en: "Which actors are decisive today but underestimated?"} + - {question_id: q3, de: "Was sollte sich in den nächsten fünf Jahren ändern, damit die Fischerei eine Zukunft hat?", en: "What should change in the next five years for fisheries to have a future?"} + - {question_id: q4, de: "Welcher Trend macht Ihnen am meisten Hoffnung – und welcher am meisten Sorge?", en: "Which trend gives you most hope — and which most concern?"} +``` + +- [ ] **Step 4: Implement subagent** + +`backend/app/services/interviews/delphi.py`: +```python +from __future__ import annotations +import json +import statistics +from pathlib import Path +from typing import Optional +import yaml +from app.models.interview import ( + DelphiOpenResponse, DelphiRatingResponse, +) +from app.services.interviews.base import StakeholderInterviewer, PersonaRecord + +class DelphiSubagent: + def __init__(self, llm, memory, instrument_path: Path, language: str = "de"): + with Path(instrument_path).open("r", encoding="utf-8") as f: + self.instrument = yaml.safe_load(f) + self.interviewer = StakeholderInterviewer(llm=llm, memory=memory, language=language) + self.llm = llm + self.language = language + + # --- Round 1: open questions --- + def _r1_schema(self) -> str: + return json.dumps({ + "answers": {q["question_id"]: "" for q in self.instrument["questions"]} + }, ensure_ascii=False) + + def _r1_prompt(self) -> str: + lines = ["Bitte beantworten Sie offen:" if self.language == "de" else "Please answer openly:"] + for q in self.instrument["questions"]: + txt = q["de"] if self.language == "de" else q["en"] + lines.append(f"[{q['question_id']}] {txt}") + return "\n".join(lines) + + def _r1_validate(self, raw: dict) -> Optional[dict]: + if not isinstance(raw, dict): return None + ans = raw.get("answers") + if not isinstance(ans, dict): return None + required = {q["question_id"] for q in self.instrument["questions"]} + if not required.issubset(ans.keys()): return None + return raw + + def administer_round1(self, persona: PersonaRecord) -> DelphiOpenResponse: + raw = self.interviewer.ask_in_character( + persona, user_prompt=self._r1_prompt(), + schema_hint=self._r1_schema(), validate=self._r1_validate, + ) + return DelphiOpenResponse(agent_id=persona.agent_id, round=1, + answers={k: str(v) for k, v in raw["answers"].items()}) + + # --- Round 2: rate themes --- + def _r2_schema(self, theme_ids: list[str]) -> str: + return json.dumps({ + "ratings": {tid: {"importance": "", "plausibility": ""} for tid in theme_ids} + }, ensure_ascii=False) + + def _r2_prompt(self, themes: list[dict]) -> str: + head = "Bewerten Sie jedes Thema nach Wichtigkeit (1-5) und Plausibilität (1-5):" if self.language == "de" \ + else "Rate each theme on importance (1-5) and plausibility (1-5):" + body = [f"- [{t['theme_id']}] {t['label']}" for t in themes] + return head + "\n" + "\n".join(body) + + def _r2_validate(self, theme_ids: list[str]): + def v(raw: dict) -> Optional[dict]: + if not isinstance(raw, dict): return None + ratings = raw.get("ratings", {}) + if set(ratings.keys()) != set(theme_ids): return None + for tid, r in ratings.items(): + if not isinstance(r, dict): return None + for key in ("importance", "plausibility"): + if not isinstance(r.get(key), int) or not 1 <= r[key] <= 5: return None + return raw + return v + + def administer_round2(self, persona: PersonaRecord, themes: list[dict]) -> DelphiRatingResponse: + theme_ids = [t["theme_id"] for t in themes] + raw = self.interviewer.ask_in_character( + persona, user_prompt=self._r2_prompt(themes), + schema_hint=self._r2_schema(theme_ids), validate=self._r2_validate(theme_ids), + ) + return DelphiRatingResponse(agent_id=persona.agent_id, round=2, + ratings={k: dict(v) for k, v in raw["ratings"].items()}) + + # --- Round 3: revise after seeing group stats --- + def administer_round3( + self, persona: PersonaRecord, themes: list[dict], group_stats: dict, own_r2: DelphiRatingResponse + ) -> DelphiRatingResponse: + theme_ids = [t["theme_id"] for t in themes] + head = ("Sie sehen unten die anonymisierten Gruppenwerte (Median, IQR). " + "Bitte überarbeiten Sie Ihre Bewertungen, wenn Sie möchten, und begründen Sie kurz.") \ + if self.language == "de" else \ + ("Below are the anonymised group values (median, IQR). " + "Please revise your ratings if you wish and add a short justification.") + ctx_lines = [] + for t in themes: + tid = t["theme_id"] + gs = group_stats.get(tid, {}) + own = own_r2.ratings.get(tid, {}) + ctx_lines.append( + f"[{tid}] {t['label']} — group importance median={gs.get('imp_median')}, " + f"IQR={gs.get('imp_iqr')}; plausibility median={gs.get('plaus_median')}, " + f"IQR={gs.get('plaus_iqr')}. Your R2: imp={own.get('importance')}, plaus={own.get('plausibility')}." + ) + prompt = head + "\n\n" + "\n".join(ctx_lines) + schema = json.dumps({ + "ratings": {tid: {"importance": "", "plausibility": ""} for tid in theme_ids}, + "justification": "", + }, ensure_ascii=False) + def validate(raw): + if not isinstance(raw, dict): return None + ratings = raw.get("ratings", {}) + if set(ratings.keys()) != set(theme_ids): return None + for r in ratings.values(): + if not isinstance(r, dict): return None + for key in ("importance", "plausibility"): + if not isinstance(r.get(key), int) or not 1 <= r[key] <= 5: return None + return raw + raw = self.interviewer.ask_in_character(persona, user_prompt=prompt, + schema_hint=schema, validate=validate) + return DelphiRatingResponse( + agent_id=persona.agent_id, round=3, + ratings={k: dict(v) for k, v in raw["ratings"].items()}, + justification=raw.get("justification"), + ) + +def extract_themes(round1: list[DelphiOpenResponse], llm) -> list[dict]: + text_blocks = [] + for r in round1: + for qid, ans in r.answers.items(): + text_blocks.append(f"[agent {r.agent_id} {qid}] {ans}") + schema = json.dumps({"themes": [{"theme_id": "", "label": ""}]}, ensure_ascii=False) + messages = [ + {"role": "system", "content": + "You extract distinct thematic codes from open-ended German fisheries survey responses. " + f"Return JSON ONLY matching: {schema}. Use stable theme_ids of form theme_0, theme_1, …"}, + {"role": "user", "content": "Responses:\n" + "\n".join(text_blocks) + "\n\nReturn up to 12 distinct themes."}, + ] + raw = llm.chat_json(messages=messages, temperature=0.0) + themes = raw.get("themes", []) if isinstance(raw, dict) else [] + out = [] + for i, t in enumerate(themes): + if isinstance(t, dict) and "label" in t: + out.append({"theme_id": t.get("theme_id") or f"theme_{i}", "label": str(t["label"])}) + return out + +def _iqr(xs: list[float]) -> float: + if not xs: return 0.0 + xs = sorted(xs) + q1 = statistics.quantiles(xs, n=4)[0] if len(xs) >= 4 else xs[0] + q3 = statistics.quantiles(xs, n=4)[2] if len(xs) >= 4 else xs[-1] + return q3 - q1 + +def convergence_metrics(r2: list[DelphiRatingResponse], r3: list[DelphiRatingResponse]) -> dict: + by_r2 = {r.agent_id: r for r in r2} + by_r3 = {r.agent_id: r for r in r3} + themes: set[str] = set() + for r in r2 + r3: + themes.update(r.ratings.keys()) + out: dict[str, dict] = {} + for t in sorted(themes): + imp_r2 = [by_r2[a].ratings[t]["importance"] for a in by_r2 if t in by_r2[a].ratings] + imp_r3 = [by_r3[a].ratings[t]["importance"] for a in by_r3 if t in by_r3[a].ratings] + plaus_r2 = [by_r2[a].ratings[t]["plausibility"] for a in by_r2 if t in by_r2[a].ratings] + plaus_r3 = [by_r3[a].ratings[t]["plausibility"] for a in by_r3 if t in by_r3[a].ratings] + out[t] = { + "imp_median_r2": statistics.median(imp_r2) if imp_r2 else None, + "imp_median_r3": statistics.median(imp_r3) if imp_r3 else None, + "imp_iqr_r2": _iqr(imp_r2), + "imp_iqr_r3": _iqr(imp_r3), + "delta_iqr_importance": _iqr(imp_r3) - _iqr(imp_r2), + "plaus_iqr_r2": _iqr(plaus_r2), + "plaus_iqr_r3": _iqr(plaus_r3), + "delta_iqr_plausibility": _iqr(plaus_r3) - _iqr(plaus_r2), + } + return out + +def group_stats_from_r2(r2: list[DelphiRatingResponse]) -> dict: + themes: set[str] = set() + for r in r2: themes.update(r.ratings.keys()) + stats: dict[str, dict] = {} + for t in themes: + imps = [r.ratings[t]["importance"] for r in r2 if t in r.ratings] + plauss = [r.ratings[t]["plausibility"] for r in r2 if t in r.ratings] + stats[t] = { + "imp_median": statistics.median(imps) if imps else None, + "imp_iqr": _iqr(imps), + "plaus_median": statistics.median(plauss) if plauss else None, + "plaus_iqr": _iqr(plauss), + } + return stats +``` + +- [ ] **Step 5: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_delphi.py -v` +Expected: 3 passed. + +- [ ] **Step 6: Commit** + +```bash +git add backend/scripts/instruments/delphi_v1.yaml backend/app/services/interviews/delphi.py backend/tests/interviews/test_delphi.py +git commit -m "feat(interviews): Delphi subagent (3 rounds: open, rate, revise) + convergence metrics" +``` + +--- + +### Task 9: Scenario subagent + +**Files:** +- Create: `backend/scripts/instruments/scenario_v1.yaml` +- Create: `backend/app/services/interviews/scenario.py` +- Test: `backend/tests/interviews/test_scenario.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_scenario.py +from pathlib import Path +from app.services.interviews.base import PersonaRecord, MemoryDigest +from app.services.interviews.scenario import ScenarioSubagent, polarity_matrix + +INSTRUMENT = Path(__file__).resolve().parents[2] / "scripts" / "instruments" / "scenario_v1.yaml" + +class _Mem: + def get_digest(self, agent_id, max_chars=2000): + return MemoryDigest(text="x", available=True) + +class _LLM: + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + return {"ratings": {sid: { + "desirability": 4, "plausibility": 3, "impact_on_my_group": 5, "fairness": 3, + "if_woke_up_response": f"act-on-{sid}", + } for sid in ("S1", "S2", "S3", "S4")}} + +def test_scenario_administer(): + sub = ScenarioSubagent(llm=_LLM(), memory=_Mem(), instrument_path=INSTRUMENT) + persona = PersonaRecord(agent_id=1, name="A", persona="p") + resp = sub.administer(persona) + assert set(resp.ratings.keys()) == {"S1", "S2", "S3", "S4"} + assert resp.ratings["S1"].desirability == 4 + +def test_polarity_matrix(): + from app.models.interview import ScenarioResponse, ScenarioRating + responses = [ScenarioResponse(agent_id=i, ratings={ + "S1": ScenarioRating(desirability=5, plausibility=4, impact_on_my_group=5, fairness=4, + if_woke_up_response="x"), + }) for i in range(3)] + m = polarity_matrix(responses) + assert "S1" in m + assert m["S1"]["mean_desirability"] == 5 + assert m["S1"]["n"] == 3 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_scenario.py -v` +Expected: ImportError. + +- [ ] **Step 3: Create instrument YAML** + +`backend/scripts/instruments/scenario_v1.yaml`: +```yaml +name: scenario_v1 +version: "1.0" +language_default: de +scenarios: + - scenario_id: S1 + label_de: "Erholung 2040" + label_en: "Recovery 2040" + description_de: | + Bis 2040 haben sich Dorsch- und Heringsbestände in der westlichen Ostsee + deutlich erholt. MSC-Zertifizierung ist branchenweit Standard. Die kleine + Küstenfischerei hat sich stabilisiert; die Politik gilt als erfolgreich. + description_en: | + By 2040, Western Baltic cod and herring stocks have substantially recovered. + MSC certification is industry-wide standard. Small-scale coastal fisheries + have stabilised; policy is regarded as successful. + - scenario_id: S2 + label_de: "Kollaps 2040" + label_en: "Collapse 2040" + description_de: | + Bis 2040 sind Dorsch- und Heringsbestände zusammengebrochen. Die Flotte + ist halbiert, Aquakultur dominiert den Markt, Häfen veröden. + description_en: | + By 2040, cod and herring stocks have collapsed. The fleet is halved, + aquaculture dominates the market, harbour towns decline. + - scenario_id: S3 + label_de: "Festung Europa 2040" + label_en: "Fortress Europe 2040" + description_de: | + Bis 2040 verfolgt die EU eine protektionistische Politik mit hohen Importzöllen, + Meeresschutzgebiete bedecken 30% der Ostsee, Sportfischerei ist stark eingeschränkt. + description_en: | + By 2040, the EU pursues a protectionist policy with high import tariffs, + MPAs cover 30% of the Baltic, recreational fishing is strongly curtailed. + - scenario_id: S4 + label_de: "Privatisierung 2040" + label_en: "Privatisation 2040" + description_de: | + Bis 2040 sind Fangrechte als handelbare Quoten (ITQs) etabliert. Die Branche + hat sich konsolidiert; nur große, kapitalstarke Unternehmen sind übrig. + description_en: | + By 2040, fishing rights are tradable quotas (ITQs). The industry has + consolidated; only large, well-capitalised firms remain. +dimensions: + - {dimension_id: desirability, scale: 7, + de: "Wie wünschenswert ist dieses Szenario?", en: "How desirable is this scenario?"} + - {dimension_id: plausibility, scale: 7, + de: "Wie plausibel ist dieses Szenario?", en: "How plausible is this scenario?"} + - {dimension_id: impact_on_my_group, scale: 7, + de: "Wie stark trifft es Ihre Gruppe?", en: "How strongly does it affect your group?"} + - {dimension_id: fairness, scale: 7, + de: "Wie fair ist dieses Szenario?", en: "How fair is this scenario?"} +``` + +- [ ] **Step 4: Implement subagent** + +`backend/app/services/interviews/scenario.py`: +```python +from __future__ import annotations +import json +import statistics +from pathlib import Path +from typing import Optional +import yaml +from app.models.interview import ScenarioRating, ScenarioResponse +from app.services.interviews.base import StakeholderInterviewer, PersonaRecord + +class ScenarioSubagent: + def __init__(self, llm, memory, instrument_path: Path, language: str = "de"): + with Path(instrument_path).open("r", encoding="utf-8") as f: + self.instrument = yaml.safe_load(f) + self.interviewer = StakeholderInterviewer(llm=llm, memory=memory, language=language) + self.language = language + + def _schema_hint(self) -> str: + sids = [s["scenario_id"] for s in self.instrument["scenarios"]] + return json.dumps({ + "ratings": {sid: { + "desirability": "", + "plausibility": "", + "impact_on_my_group": "", + "fairness": "", + "if_woke_up_response": "", + } for sid in sids} + }, ensure_ascii=False) + + def _user_prompt(self) -> str: + head = ("Bewerten Sie jedes der folgenden Szenarien auf vier Dimensionen (1-7) " + "und beantworten Sie kurz, was Sie tun würden, wenn Sie in dieser Welt aufwachten.") \ + if self.language == "de" else \ + ("Rate each of the following scenarios on four dimensions (1-7) " + "and briefly answer what you would do if you woke up in this world.") + blocks = [] + for s in self.instrument["scenarios"]: + label = s["label_de"] if self.language == "de" else s["label_en"] + desc = s["description_de"] if self.language == "de" else s["description_en"] + blocks.append(f"--- {s['scenario_id']}: {label} ---\n{desc}") + return head + "\n\n" + "\n\n".join(blocks) + + def _validate(self, raw: dict) -> Optional[dict]: + if not isinstance(raw, dict): return None + sids = {s["scenario_id"] for s in self.instrument["scenarios"]} + ratings = raw.get("ratings", {}) + if set(ratings.keys()) != sids: return None + for v in ratings.values(): + if not isinstance(v, dict): return None + for k in ("desirability", "plausibility", "impact_on_my_group", "fairness"): + if not isinstance(v.get(k), int) or not 1 <= v[k] <= 7: return None + if not isinstance(v.get("if_woke_up_response", ""), str): return None + return raw + + def administer(self, persona: PersonaRecord) -> ScenarioResponse: + raw = self.interviewer.ask_in_character( + persona, user_prompt=self._user_prompt(), + schema_hint=self._schema_hint(), validate=self._validate, + ) + ratings = {sid: ScenarioRating(**v) for sid, v in raw["ratings"].items()} + return ScenarioResponse(agent_id=persona.agent_id, ratings=ratings) + +def polarity_matrix(responses: list[ScenarioResponse]) -> dict: + matrix: dict[str, dict] = {} + sids: set[str] = set() + for r in responses: sids.update(r.ratings.keys()) + for sid in sorted(sids): + vals = [r.ratings[sid] for r in responses if sid in r.ratings] + if not vals: + matrix[sid] = {"n": 0} + continue + matrix[sid] = { + "n": len(vals), + "mean_desirability": statistics.mean(v.desirability for v in vals), + "mean_plausibility": statistics.mean(v.plausibility for v in vals), + "mean_impact": statistics.mean(v.impact_on_my_group for v in vals), + "mean_fairness": statistics.mean(v.fairness for v in vals), + "sd_desirability": statistics.pstdev([v.desirability for v in vals]) if len(vals) > 1 else 0.0, + "sd_plausibility": statistics.pstdev([v.plausibility for v in vals]) if len(vals) > 1 else 0.0, + } + return matrix +``` + +- [ ] **Step 5: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_scenario.py -v` +Expected: 2 passed. + +- [ ] **Step 6: Commit** + +```bash +git add backend/scripts/instruments/scenario_v1.yaml backend/app/services/interviews/scenario.py backend/tests/interviews/test_scenario.py +git commit -m "feat(interviews): scenario subagent with 4 futures × 4 dimensions + polarity matrix" +``` + +--- + +## Phase 3 — Storage and Zep + +### Task 10: Interview storage layout writer + +**Files:** +- Create: `backend/app/services/interviews/storage.py` +- Test: `backend/tests/interviews/test_storage.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_storage.py +import json +from pathlib import Path +from app.models.interview import ( + LikertResponse, InterviewPhase, SubagentKind, +) +from app.services.interviews.storage import InterviewStore + +def test_run_directory_layout(tmp_path): + store = InterviewStore(root=tmp_path, sim_id="sim42") + run_dir = store.start_run(phase=InterviewPhase.T0, subagent=SubagentKind.LONGITUDINAL) + assert run_dir.exists() + assert run_dir.parent.name == "longitudinal" + assert run_dir.parent.parent.name == "T0" + +def test_append_response(tmp_path): + store = InterviewStore(root=tmp_path, sim_id="sim42") + run_dir = store.start_run(phase=InterviewPhase.T0, subagent=SubagentKind.LONGITUDINAL) + r = LikertResponse(agent_id=1, phase=InterviewPhase.T0, + responses={"a": 3}, confidence={"a": 0.5}) + store.append_response(run_dir, r) + contents = (run_dir / "responses.jsonl").read_text() + assert json.loads(contents.splitlines()[0])["agent_id"] == 1 + +def test_write_aggregate_and_latest_pointer(tmp_path): + store = InterviewStore(root=tmp_path, sim_id="sim42") + run_dir = store.start_run(phase=InterviewPhase.T1, subagent=SubagentKind.SCENARIO) + store.write_aggregate(run_dir, {"k": 1}) + store.mark_latest(run_dir) + latest = (run_dir.parent / "latest.json").read_text() + assert json.loads(latest)["run_dir"].endswith(run_dir.name) + +def test_audit_log_append(tmp_path): + store = InterviewStore(root=tmp_path, sim_id="sim42") + run_dir = store.start_run(phase=InterviewPhase.T0, subagent=SubagentKind.DELPHI) + store.audit(run_dir, agent_id=7, event="schema_violation", detail="missing key x") + audit = (run_dir / "audit.jsonl").read_text() + assert "schema_violation" in audit +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_storage.py -v` +Expected: ImportError. + +- [ ] **Step 3: Implement storage** + +`backend/app/services/interviews/storage.py`: +```python +from __future__ import annotations +import json +import time +import uuid +from pathlib import Path +from typing import Any +from pydantic import BaseModel +from app.models.interview import InterviewPhase, SubagentKind + +class InterviewStore: + def __init__(self, root: Path, sim_id: str): + self.base = Path(root) / "simulations" / sim_id / "interviews" + self.base.mkdir(parents=True, exist_ok=True) + + def start_run(self, phase: InterviewPhase, subagent: SubagentKind) -> Path: + run_id = time.strftime("%Y%m%dT%H%M%S") + "-" + uuid.uuid4().hex[:6] + run_dir = self.base / phase.value / subagent.value / run_id + run_dir.mkdir(parents=True, exist_ok=True) + meta = {"run_id": run_id, "phase": phase.value, "subagent": subagent.value, + "created_at": time.time()} + (run_dir / "run.json").write_text(json.dumps(meta, indent=2), encoding="utf-8") + return run_dir + + def append_response(self, run_dir: Path, model: BaseModel) -> None: + path = run_dir / "responses.jsonl" + with path.open("a", encoding="utf-8") as f: + f.write(model.model_dump_json() + "\n") + + def append_jsonl(self, run_dir: Path, filename: str, payload: dict | BaseModel) -> None: + path = run_dir / filename + with path.open("a", encoding="utf-8") as f: + if isinstance(payload, BaseModel): + f.write(payload.model_dump_json() + "\n") + else: + f.write(json.dumps(payload, ensure_ascii=False) + "\n") + + def read_responses(self, run_dir: Path, filename: str = "responses.jsonl") -> list[dict]: + path = run_dir / filename + if not path.exists(): return [] + return [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines() if line.strip()] + + def write_aggregate(self, run_dir: Path, payload: dict) -> None: + (run_dir / "aggregate.json").write_text( + json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + def write_named(self, run_dir: Path, name: str, payload: Any) -> None: + (run_dir / name).write_text( + json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + def audit(self, run_dir: Path, agent_id: int | None, event: str, detail: str = "") -> None: + entry = {"ts": time.time(), "agent_id": agent_id, "event": event, "detail": detail} + with (run_dir / "audit.jsonl").open("a", encoding="utf-8") as f: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + + def mark_latest(self, run_dir: Path) -> None: + pointer = run_dir.parent / "latest.json" + pointer.write_text(json.dumps({ + "run_dir": str(run_dir.relative_to(self.base)), + }), encoding="utf-8") + + def latest_run(self, phase: InterviewPhase, subagent: SubagentKind) -> Path | None: + pointer = self.base / phase.value / subagent.value / "latest.json" + if not pointer.exists(): return None + rel = json.loads(pointer.read_text())["run_dir"] + path = self.base / rel + return path if path.exists() else None +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_storage.py -v` +Expected: 4 passed. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/interviews/storage.py backend/tests/interviews/test_storage.py +git commit -m "feat(interviews): JSONL/JSON storage layout with run_id directories and latest pointer" +``` + +--- + +### Task 11: Zep episode writer for interviews + +**Files:** +- Create: `backend/app/services/interviews/zep_writer.py` +- Test: `backend/tests/interviews/test_zep_writer.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_zep_writer.py +from app.models.interview import ( + LikertResponse, InterviewPhase, SubagentKind, +) +from app.services.interviews.zep_writer import InterviewZepWriter + +class _FakeMemoryUpdater: + def __init__(self): + self.events = [] + def add_activity(self, activity): + self.events.append(activity) + def add_text_episode(self, graph_id, text): + self.events.append({"graph_id": graph_id, "text": text}) + +def test_per_agent_episode_text(): + upd = _FakeMemoryUpdater() + w = InterviewZepWriter(memory_updater=upd, graph_id="g1") + r = LikertResponse(agent_id=42, phase=InterviewPhase.T1, + responses={"stk_1": 4, "gov_1": 3}, + confidence={"stk_1": 0.8, "gov_1": 0.7}) + w.write_per_agent(SubagentKind.LONGITUDINAL, r, agent_name="Fischer Müller") + assert any("Fischer Müller" in str(e) for e in upd.events) + assert any("longitudinal/T1" in str(e) for e in upd.events) + +def test_aggregate_episode(): + upd = _FakeMemoryUpdater() + w = InterviewZepWriter(memory_updater=upd, graph_id="g1") + w.write_aggregate(SubagentKind.SCENARIO, summary="S1 mean desirability 5.2; S2 mean 2.1") + assert any("S1 mean" in str(e) for e in upd.events) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_zep_writer.py -v` +Expected: ImportError. + +- [ ] **Step 3: Implement writer** + +`backend/app/services/interviews/zep_writer.py`: +```python +from __future__ import annotations +from typing import Any, Optional +from app.models.interview import ( + LikertResponse, QSortResponse, DelphiRatingResponse, ScenarioResponse, SubagentKind, +) + +class InterviewZepWriter: + """Mirrors `ZepGraphMemoryUpdater.add_activity` usage but for interview episodes. + + The real `ZepGraphMemoryUpdater` may expose `add_activity` (preferred) or a lower-level + text-episode method; this writer adapts to either via duck typing. + """ + def __init__(self, memory_updater, graph_id: str): + self.updater = memory_updater + self.graph_id = graph_id + + def _emit(self, text: str) -> None: + if hasattr(self.updater, "add_text_episode"): + self.updater.add_text_episode(self.graph_id, text) + elif hasattr(self.updater, "add_activity"): + self.updater.add_activity({"graph_id": self.graph_id, "text": text}) + else: + raise RuntimeError("memory_updater has neither add_text_episode nor add_activity") + + def _summarize_likert(self, r: LikertResponse) -> str: + mean_v = sum(r.responses.values()) / max(len(r.responses), 1) + top = sorted(r.responses.items(), key=lambda kv: -kv[1])[:3] + bot = sorted(r.responses.items(), key=lambda kv: kv[1])[:3] + return (f"mean={mean_v:.2f}; agrees with {[k for k,_ in top]}; " + f"disagrees with {[k for k,_ in bot]}") + + def _summarize_qsort(self, r: QSortResponse) -> str: + plus = [k for k, v in r.placements.items() if v >= 2] + minus = [k for k, v in r.placements.items() if v <= -2] + return f"+strongly:{plus}; -strongly:{minus}" + + def _summarize_scenario(self, r: ScenarioResponse) -> str: + parts = [f"{sid}: des={rt.desirability} plaus={rt.plausibility}" + for sid, rt in r.ratings.items()] + return "; ".join(parts) + + def write_per_agent( + self, subagent: SubagentKind, response: Any, agent_name: str, + phase: Optional[str] = None, + ) -> None: + if isinstance(response, LikertResponse): + phase = phase or response.phase.value + summary = self._summarize_likert(response) + elif isinstance(response, QSortResponse): + phase = phase or "T1" + summary = self._summarize_qsort(response) + elif isinstance(response, ScenarioResponse): + phase = phase or "T1" + summary = self._summarize_scenario(response) + elif isinstance(response, DelphiRatingResponse): + phase = phase or f"T1/R{response.round}" + summary = f"round={response.round}; {len(response.ratings)} themes rated" + else: + phase = phase or "T1" + summary = str(response)[:200] + text = f"Agent {agent_name} (interview/{subagent.value}/{phase}): {summary}" + self._emit(text) + + def write_aggregate(self, subagent: SubagentKind, summary: str) -> None: + self._emit(f"Interview aggregate ({subagent.value}): {summary}") +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_zep_writer.py -v` +Expected: 2 passed. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/interviews/zep_writer.py backend/tests/interviews/test_zep_writer.py +git commit -m "feat(interviews): Zep writer adapts add_activity/add_text_episode for per-agent + aggregate episodes" +``` + +--- + +## Phase 4 — Orchestrator, lifecycle, synthesiser + +### Task 12: InterviewOrchestrator (parallel fan-out) + +**Files:** +- Create: `backend/app/services/interview_orchestrator.py` +- Test: `backend/tests/interviews/test_orchestrator.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_orchestrator.py +from pathlib import Path +import pytest +from app.models.interview import InterviewPhase, SubagentKind +from app.services.interviews.base import PersonaRecord, MemoryDigest +from app.services.interview_orchestrator import ( + InterviewOrchestrator, PersonaProvider, +) + +INST_DIR = Path(__file__).resolve().parents[2] / "scripts" / "instruments" + +class _Mem: + def get_digest(self, agent_id, max_chars=2000): + return MemoryDigest(text="x", available=True) + +class _LLM: + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + sys_text = next((m["content"] for m in messages if m["role"] == "system"), "") + if "longitudinal" in sys_text or "stk_" in (messages[-1].get("content") or ""): + return { + "responses": {k: 3 for k in ("stk_1","stk_2","stk_3","gov_1","gov_2","gov_3", + "mkt_1","mkt_2","mkt_3","clm_1","clm_2","clm_3")}, + "confidence": {}, "open_comment": "ok", + } + return {} + +class _Personas(PersonaProvider): + def __init__(self, n=3): + self._items = [PersonaRecord(agent_id=i, name=f"A{i}", persona="p") for i in range(n)] + def all(self): return list(self._items) + +class _NoopZep: + def write_per_agent(self, *a, **kw): pass + def write_aggregate(self, *a, **kw): pass + +def test_pre_phase_runs_longitudinal_only(tmp_path): + orch = InterviewOrchestrator( + llm=_LLM(), memory=_Mem(), personas=_Personas(3), + instrument_dir=INST_DIR, store_root=tmp_path, sim_id="sim1", + zep_writer=_NoopZep(), max_workers=2, + ) + result = orch.run_pre() + assert result["longitudinal"]["n_responded"] == 3 + assert "diversity" not in result # only longitudinal in pre-phase + +def test_partial_failure_does_not_kill_run(tmp_path): + class _FlakyLLM: + def __init__(self): self.n = 0 + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + self.n += 1 + if self.n % 2 == 0: + raise RuntimeError("simulated LLM 5xx") + return { + "responses": {k: 3 for k in ("stk_1","stk_2","stk_3","gov_1","gov_2","gov_3", + "mkt_1","mkt_2","mkt_3","clm_1","clm_2","clm_3")}, + "confidence": {}, "open_comment": "ok", + } + orch = InterviewOrchestrator( + llm=_FlakyLLM(), memory=_Mem(), personas=_Personas(4), + instrument_dir=INST_DIR, store_root=tmp_path, sim_id="sim2", + zep_writer=_NoopZep(), max_workers=1, + ) + result = orch.run_pre() + assert result["longitudinal"]["n_responded"] < 4 + assert result["longitudinal"]["n_failed"] > 0 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_orchestrator.py -v` +Expected: ImportError. + +- [ ] **Step 3: Implement orchestrator** + +`backend/app/services/interview_orchestrator.py`: +```python +from __future__ import annotations +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from typing import Protocol +from app.models.interview import ( + InterviewPhase, SubagentKind, LikertResponse, QSortResponse, + DelphiOpenResponse, DelphiRatingResponse, ScenarioResponse, +) +from app.services.interviews.base import PersonaRecord +from app.services.interviews.longitudinal import LongitudinalSubagent, run_aggregate as longitudinal_aggregate +from app.services.interviews.diversity import DiversitySubagent, run_typology +from app.services.interviews.delphi import ( + DelphiSubagent, extract_themes, convergence_metrics, group_stats_from_r2, +) +from app.services.interviews.scenario import ScenarioSubagent, polarity_matrix +from app.services.interviews.storage import InterviewStore +from app.services.interviews.instrument_loader import freeze_snapshot + +class PersonaProvider(Protocol): + def all(self) -> list[PersonaRecord]: ... + +class InterviewOrchestrator: + def __init__( + self, llm, memory, personas: PersonaProvider, + instrument_dir: Path, store_root: Path, sim_id: str, + zep_writer, max_workers: int = 8, language: str = "de", + ): + self.llm = llm + self.memory = memory + self.personas = personas + self.instrument_dir = Path(instrument_dir) + self.store = InterviewStore(root=store_root, sim_id=sim_id) + self.zep_writer = zep_writer + self.max_workers = max_workers + self.language = language + # Freeze snapshot once per orchestrator lifetime + freeze_snapshot( + instruments={ + "longitudinal": self.instrument_dir / "longitudinal_v1.yaml", + "diversity": self.instrument_dir / "diversity_v1.yaml", + "delphi": self.instrument_dir / "delphi_v1.yaml", + "scenario": self.instrument_dir / "scenario_v1.yaml", + }, + out_path=self.store.base / "instruments_used.json", + ) + + # --- Generic per-agent runner --- + def _fan_out(self, run_dir, agent_fn, personas, audit_label): + ok: list = [] + failed: list[int] = [] + with ThreadPoolExecutor(max_workers=self.max_workers) as pool: + futures = {pool.submit(agent_fn, p): p for p in personas} + for fut in as_completed(futures): + p = futures[fut] + try: + out = fut.result() + ok.append(out) + self.store.append_response(run_dir, out) + except Exception as e: + failed.append(p.agent_id) + self.store.audit(run_dir, agent_id=p.agent_id, + event="agent_failed", detail=f"{audit_label}: {e!r}") + return ok, failed + + # --- Pre-phase (T0) --- + def run_pre(self) -> dict: + sub = LongitudinalSubagent(self.llm, self.memory, + self.instrument_dir / "longitudinal_v1.yaml", + language=self.language) + run_dir = self.store.start_run(InterviewPhase.T0, SubagentKind.LONGITUDINAL) + ok, failed = self._fan_out( + run_dir, lambda p: sub.administer(p, phase=InterviewPhase.T0), + self.personas.all(), audit_label="longitudinal_T0", + ) + for r in ok: + persona = next(p for p in self.personas.all() if p.agent_id == r.agent_id) + try: self.zep_writer.write_per_agent(SubagentKind.LONGITUDINAL, r, persona.name) + except Exception: pass + self.store.mark_latest(run_dir) + return {"longitudinal": {"n_responded": len(ok), "n_failed": len(failed), + "run_dir": str(run_dir)}} + + # --- Post-phase (T1) --- + def run_post(self) -> dict: + personas = self.personas.all() + out: dict = {} + with ThreadPoolExecutor(max_workers=4) as pool: + futures = { + "longitudinal": pool.submit(self._post_longitudinal, personas), + "diversity": pool.submit(self._post_diversity, personas), + "scenario": pool.submit(self._post_scenario, personas), + } + for name, fut in futures.items(): + try: out[name] = fut.result() + except Exception as e: out[name] = {"error": repr(e)} + # Delphi runs sequentially (R1 → R2 → R3) and uses the LLM for theme extraction + try: out["delphi"] = self._post_delphi(personas) + except Exception as e: out["delphi"] = {"error": repr(e)} + return out + + def _post_longitudinal(self, personas) -> dict: + sub = LongitudinalSubagent(self.llm, self.memory, + self.instrument_dir / "longitudinal_v1.yaml", + language=self.language) + run_dir = self.store.start_run(InterviewPhase.T1, SubagentKind.LONGITUDINAL) + ok, failed = self._fan_out( + run_dir, lambda p: sub.administer(p, phase=InterviewPhase.T1), + personas, audit_label="longitudinal_T1", + ) + # Aggregate using T0 + T1 + t0_path = self.store.latest_run(InterviewPhase.T0, SubagentKind.LONGITUDINAL) + t0_raw = self.store.read_responses(t0_path) if t0_path else [] + t0 = [LikertResponse(**d) for d in t0_raw] + agg = longitudinal_aggregate(t0, ok) + self.store.write_aggregate(run_dir, agg) + for r in ok: + persona = next(p for p in personas if p.agent_id == r.agent_id) + try: self.zep_writer.write_per_agent(SubagentKind.LONGITUDINAL, r, persona.name) + except Exception: pass + try: self.zep_writer.write_aggregate(SubagentKind.LONGITUDINAL, + f"n_paired={agg['n_paired']}") + except Exception: pass + self.store.mark_latest(run_dir) + return {"n_responded": len(ok), "n_failed": len(failed), "run_dir": str(run_dir)} + + def _post_diversity(self, personas) -> dict: + sub = DiversitySubagent(self.llm, self.memory, + self.instrument_dir / "diversity_v1.yaml", + language=self.language) + run_dir = self.store.start_run(InterviewPhase.T1, SubagentKind.DIVERSITY) + ok, failed = self._fan_out( + run_dir, lambda p: sub.administer(p), personas, audit_label="diversity", + ) + typology = run_typology(ok) + self.store.write_named(run_dir, "typology.json", typology) + self.store.write_aggregate(run_dir, {"n": len(ok), "n_failed": len(failed), + "clusters": typology["clusters"]}) + for r in ok: + persona = next(p for p in personas if p.agent_id == r.agent_id) + try: self.zep_writer.write_per_agent(SubagentKind.DIVERSITY, r, persona.name) + except Exception: pass + self.store.mark_latest(run_dir) + return {"n_responded": len(ok), "n_failed": len(failed), "run_dir": str(run_dir)} + + def _post_scenario(self, personas) -> dict: + sub = ScenarioSubagent(self.llm, self.memory, + self.instrument_dir / "scenario_v1.yaml", + language=self.language) + run_dir = self.store.start_run(InterviewPhase.T1, SubagentKind.SCENARIO) + ok, failed = self._fan_out( + run_dir, lambda p: sub.administer(p), personas, audit_label="scenario", + ) + matrix = polarity_matrix(ok) + self.store.write_named(run_dir, "polarity_matrix.json", matrix) + self.store.write_aggregate(run_dir, {"n": len(ok), "n_failed": len(failed), + "polarity": matrix}) + for r in ok: + persona = next(p for p in personas if p.agent_id == r.agent_id) + try: self.zep_writer.write_per_agent(SubagentKind.SCENARIO, r, persona.name) + except Exception: pass + self.store.mark_latest(run_dir) + return {"n_responded": len(ok), "n_failed": len(failed), "run_dir": str(run_dir)} + + def _post_delphi(self, personas) -> dict: + sub = DelphiSubagent(self.llm, self.memory, + self.instrument_dir / "delphi_v1.yaml", + language=self.language) + run_dir = self.store.start_run(InterviewPhase.T1, SubagentKind.DELPHI) + # Round 1 + r1_ok, r1_failed = self._fan_out( + run_dir, lambda p: sub.administer_round1(p), personas, audit_label="delphi_r1", + ) + # Move all R1 responses into a dedicated file + for r in r1_ok: self.store.append_jsonl(run_dir, "round1_themes.jsonl", r) + # Extract themes from R1 + themes = extract_themes(r1_ok, llm=self.llm) + self.store.write_named(run_dir, "themes.json", {"themes": themes}) + # Round 2 + r2_ok, r2_failed = self._fan_out( + run_dir, lambda p: sub.administer_round2(p, themes), + [p for p in personas if p.agent_id in {r.agent_id for r in r1_ok}], + audit_label="delphi_r2", + ) + for r in r2_ok: self.store.append_jsonl(run_dir, "round2_ratings.jsonl", r) + gstats = group_stats_from_r2(r2_ok) + # Round 3 + r2_by = {r.agent_id: r for r in r2_ok} + r3_personas = [p for p in personas if p.agent_id in r2_by] + def r3_call(p): return sub.administer_round3(p, themes, gstats, r2_by[p.agent_id]) + r3_ok, r3_failed = self._fan_out(run_dir, r3_call, r3_personas, audit_label="delphi_r3") + for r in r3_ok: self.store.append_jsonl(run_dir, "round3_revisions.jsonl", r) + # Convergence + conv = convergence_metrics(r2_ok, r3_ok) + self.store.write_named(run_dir, "convergence.json", conv) + self.store.write_aggregate(run_dir, { + "n_r1": len(r1_ok), "n_r2": len(r2_ok), "n_r3": len(r3_ok), + "n_failed_r1": len(r1_failed), "n_failed_r2": len(r2_failed), "n_failed_r3": len(r3_failed), + "themes": themes, + }) + for r in r3_ok: + persona = next(p for p in personas if p.agent_id == r.agent_id) + try: self.zep_writer.write_per_agent(SubagentKind.DELPHI, r, persona.name) + except Exception: pass + self.store.mark_latest(run_dir) + return {"n_r1": len(r1_ok), "n_r2": len(r2_ok), "n_r3": len(r3_ok), + "run_dir": str(run_dir)} + + # --- Re-run a single subagent --- + def rerun(self, subagent: SubagentKind) -> dict: + personas = self.personas.all() + if subagent == SubagentKind.LONGITUDINAL: return {"longitudinal": self._post_longitudinal(personas)} + if subagent == SubagentKind.DIVERSITY: return {"diversity": self._post_diversity(personas)} + if subagent == SubagentKind.SCENARIO: return {"scenario": self._post_scenario(personas)} + if subagent == SubagentKind.DELPHI: return {"delphi": self._post_delphi(personas)} + raise ValueError(f"unknown subagent {subagent}") +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_orchestrator.py -v` +Expected: 2 passed. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/interview_orchestrator.py backend/tests/interviews/test_orchestrator.py +git commit -m "feat(interviews): orchestrator with two-phase lifecycle, parallel fan-out, isolated failures" +``` + +--- + +### Task 13: Simulation manager lifecycle hooks + +**Files:** +- Modify: `backend/app/services/simulation_manager.py` +- Test: `backend/tests/interviews/test_simulation_hooks.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_simulation_hooks.py +from app.services.simulation_manager import SimulationManager, SimulationState + +def test_register_post_ready_hook_invoked(monkeypatch): + called = [] + mgr = SimulationManager() + mgr.register_on_ready(lambda state: called.append(("ready", state.sim_id))) + state = SimulationState(sim_id="abc", status="ready") + mgr._notify_on_ready(state) + assert called == [("ready", "abc")] + +def test_register_post_completed_hook_invoked(): + called = [] + mgr = SimulationManager() + mgr.register_on_completed(lambda state: called.append(("done", state.sim_id))) + state = SimulationState(sim_id="abc", status="completed") + mgr._notify_on_completed(state) + assert called == [("done", "abc")] +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_simulation_hooks.py -v` +Expected: AttributeError on `register_on_ready` / `register_on_completed`. + +- [ ] **Step 3: Add hook registry to SimulationManager** + +In `backend/app/services/simulation_manager.py`, find the `SimulationManager` class. Add to `__init__` (preserving existing init): +```python + self._on_ready_hooks: list = [] + self._on_completed_hooks: list = [] +``` + +Add methods to the class: +```python + def register_on_ready(self, fn) -> None: + self._on_ready_hooks.append(fn) + + def register_on_completed(self, fn) -> None: + self._on_completed_hooks.append(fn) + + def _notify_on_ready(self, state) -> None: + for fn in list(self._on_ready_hooks): + try: fn(state) + except Exception as e: + from app.utils.logger import get_logger + get_logger(__name__).warning(f"on_ready hook failed: {e!r}") + + def _notify_on_completed(self, state) -> None: + for fn in list(self._on_completed_hooks): + try: fn(state) + except Exception as e: + from app.utils.logger import get_logger + get_logger(__name__).warning(f"on_completed hook failed: {e!r}") +``` + +Locate the existing code that transitions state to `ready` (after `prepare_simulation` completes) and to `completed` (after simulation finishes). Insert calls to `self._notify_on_ready(state)` and `self._notify_on_completed(state)` immediately after each transition. If `SimulationState` is not a simple dataclass with `sim_id` and `status` attributes, adjust the test fixture to match the actual class shape (read the file first). + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_simulation_hooks.py -v` +Expected: 2 passed. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/simulation_manager.py backend/tests/interviews/test_simulation_hooks.py +git commit -m "feat(interviews): on_ready / on_completed hook registry on SimulationManager" +``` + +--- + +### Task 14: InterviewSynthesizer + +**Files:** +- Create: `backend/app/services/interview_synthesizer.py` +- Test: `backend/tests/interviews/test_synthesizer.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_synthesizer.py +import json +from pathlib import Path +from app.services.interviews.storage import InterviewStore +from app.models.interview import InterviewPhase, SubagentKind, LikertResponse +from app.services.interview_synthesizer import InterviewSynthesizer + +def _seed_minimal(tmp_path: Path) -> InterviewStore: + store = InterviewStore(root=tmp_path, sim_id="s1") + rd = store.start_run(InterviewPhase.T0, SubagentKind.LONGITUDINAL) + for i in range(3): + store.append_response(rd, LikertResponse( + agent_id=i, phase=InterviewPhase.T0, + responses={"stk_1": 3, "gov_1": 3}, confidence={"stk_1": 0.5, "gov_1": 0.5}, + )) + store.write_aggregate(rd, {"per_item": {}, "n_paired": 0}) + store.mark_latest(rd) + return store + +def test_synthesizer_runs_with_partial_data(tmp_path): + store = _seed_minimal(tmp_path) + synth = InterviewSynthesizer(store=store) + report = synth.run() + assert "limitations" in report.lower() + assert "stub mode" in report.lower() or "n_responded" in report.lower() + +def test_synthesizer_writes_files(tmp_path): + store = _seed_minimal(tmp_path) + synth = InterviewSynthesizer(store=store) + synth.run() + files = list((store.base / "synthesis").iterdir()) + names = {f.name for f in files} + assert "report.md" in names +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_synthesizer.py -v` +Expected: ImportError. + +- [ ] **Step 3: Implement synthesiser** + +`backend/app/services/interview_synthesizer.py`: +```python +from __future__ import annotations +import csv +import json +from pathlib import Path +from app.models.interview import InterviewPhase, SubagentKind +from app.services.interviews.storage import InterviewStore + +class InterviewSynthesizer: + def __init__(self, store: InterviewStore): + self.store = store + + def _maybe(self, phase: InterviewPhase, sub: SubagentKind) -> dict | None: + run = self.store.latest_run(phase, sub) + if run is None: return None + agg = run / "aggregate.json" + if not agg.exists(): return None + return {"run_dir": str(run), "aggregate": json.loads(agg.read_text(encoding="utf-8"))} + + def _instrument_hashes(self) -> dict: + snap = self.store.base / "instruments_used.json" + if not snap.exists(): return {} + try: data = json.loads(snap.read_text(encoding="utf-8")) + except Exception: return {} + return {k: v.get("hash") for k, v in data.items()} + + def _limitations_text(self, present: dict[str, bool]) -> str: + lines = [ + "## Limitations", + "- **Simulated, not real stakeholders.** Responses reflect how the seed-document discourse " + "and the LLM jointly encode each stakeholder type, not what an actual fisher or NGO " + "staffer would say. The instrument measures the *model of the stakeholder*, not the stakeholder.", + "- **Memory digest is lossy.** Each agent's experience of OASIS is summarised to bounded length; " + "agents do not have full episodic recall.", + "- **LLM acquiescence and centrality bias.** Likert scales with LLM respondents skew toward 3–4 " + "of 5; check per-item distribution shape before drawing conclusions.", + "- **N is what it is.** `n_responded` and `n_failed` are printed verbatim per subagent; no smoothing.", + "- **Instrument provenance.** Hashes of frozen instruments are listed below; an identical run " + "is reproducible from these snapshots.", + ] + for k, ok in present.items(): + if not ok: + lines.append(f"- *{k}* subagent results are missing for this run.") + return "\n".join(lines) + + def run(self) -> str: + sections: list[str] = [] + sections.append("# Stakeholder Interview Synthesis\n") + + long_t0 = self._maybe(InterviewPhase.T0, SubagentKind.LONGITUDINAL) + long_t1 = self._maybe(InterviewPhase.T1, SubagentKind.LONGITUDINAL) + if long_t1: + agg = long_t1["aggregate"] + sections.append("## Longitudinal opinion drift (T0 → T1)") + sections.append(f"- N paired: {agg.get('n_paired', 'NA')}") + per_item = agg.get("per_item", {}) + top = sorted(per_item.items(), + key=lambda kv: abs(kv[1].get("mean_delta") or 0), reverse=True)[:5] + sections.append("- Largest mean shifts:") + for k, v in top: + sections.append(f" - `{k}`: Δ̄ = {v.get('mean_delta'):+0.2f} (n={v.get('n')})") + + diversity = self._maybe(InterviewPhase.T1, SubagentKind.DIVERSITY) + if diversity: + clusters = diversity["aggregate"].get("clusters", []) + sections.append("## Stakeholder typology") + sections.append(f"- N agents: {diversity['aggregate'].get('n', 'NA')}") + sections.append(f"- Clusters: {len(clusters)}") + for c in clusters: + sections.append(f" - cluster {c['cluster_id']}: n={c['n']}, " + f"top loadings = {list(c['top_loadings'].keys())[:5]}") + + delphi = self._maybe(InterviewPhase.T1, SubagentKind.DELPHI) + if delphi: + agg = delphi["aggregate"] + sections.append("## Delphi consensus") + sections.append(f"- Rounds completed: R1={agg.get('n_r1')}, R2={agg.get('n_r2')}, R3={agg.get('n_r3')}") + themes = agg.get("themes", []) + sections.append(f"- Themes: {[t.get('label') for t in themes]}") + + scenario = self._maybe(InterviewPhase.T1, SubagentKind.SCENARIO) + if scenario: + pol = scenario["aggregate"].get("polarity", {}) + sections.append("## Scenario evaluation") + for sid in sorted(pol): + v = pol[sid] + if v.get("n", 0) == 0: continue + sections.append( + f"- **{sid}**: n={v['n']}, desirability {v['mean_desirability']:.2f}, " + f"plausibility {v['mean_plausibility']:.2f}, impact {v['mean_impact']:.2f}, " + f"fairness {v['mean_fairness']:.2f}") + + sections.append("") + sections.append(self._limitations_text({ + "longitudinal": bool(long_t1), + "diversity": bool(diversity), + "delphi": bool(delphi), + "scenario": bool(scenario), + })) + sections.append("") + sections.append("### Instrument provenance") + for name, h in self._instrument_hashes().items(): + sections.append(f"- `{name}`: hash `{h}`") + + report = "\n\n".join(sections) + out_dir = self.store.base / "synthesis" + out_dir.mkdir(parents=True, exist_ok=True) + (out_dir / "report.md").write_text(report, encoding="utf-8") + self._write_tidy_csv(out_dir / "exports" / "all_responses.csv") + return report + + def _write_tidy_csv(self, csv_path: Path) -> None: + csv_path.parent.mkdir(parents=True, exist_ok=True) + rows: list[dict] = [] + for phase in (InterviewPhase.T0, InterviewPhase.T1): + for sub in SubagentKind: + run = self.store.latest_run(phase, sub) + if run is None: continue + files = ["responses.jsonl", "round1_themes.jsonl", + "round2_ratings.jsonl", "round3_revisions.jsonl"] + for fname in files: + for rec in self.store.read_responses(run, fname): + flat = self._flatten(rec, phase=phase.value, subagent=sub.value) + rows.extend(flat) + if not rows: + csv_path.write_text("phase,subagent,agent_id,key,value\n", encoding="utf-8") + return + fieldnames = sorted({k for r in rows for k in r.keys()}) + with csv_path.open("w", encoding="utf-8", newline="") as f: + w = csv.DictWriter(f, fieldnames=fieldnames) + w.writeheader() + for r in rows: w.writerow(r) + + def _flatten(self, rec: dict, *, phase: str, subagent: str) -> list[dict]: + out: list[dict] = [] + aid = rec.get("agent_id") + for key, val in rec.items(): + if key == "agent_id": continue + if isinstance(val, dict): + for k2, v2 in val.items(): + if isinstance(v2, dict): + for k3, v3 in v2.items(): + out.append({"phase": phase, "subagent": subagent, "agent_id": aid, + "key": f"{key}.{k2}.{k3}", "value": v3}) + else: + out.append({"phase": phase, "subagent": subagent, "agent_id": aid, + "key": f"{key}.{k2}", "value": v2}) + else: + out.append({"phase": phase, "subagent": subagent, "agent_id": aid, + "key": key, "value": val}) + return out +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_synthesizer.py -v` +Expected: 2 passed. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/interview_synthesizer.py backend/tests/interviews/test_synthesizer.py +git commit -m "feat(interviews): synthesiser emits cross-method report + tidy CSV + limitations section" +``` + +--- + +## Phase 5 — Adapters and API + +### Task 15: Persona + memory adapters + +**Files:** +- Create: `backend/app/services/interviews/adapters.py` +- Test: `backend/tests/interviews/test_adapters.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_adapters.py +import csv +import json +from pathlib import Path +from app.services.interviews.adapters import ( + FileSystemPersonaProvider, ZepMemoryProvider, +) + +def _write_reddit_profiles(tmp_path: Path): + data = [ + {"user_id": 0, "user_name": "fischer1", "name": "Fischer Müller", + "persona": "I am a small-scale Baltic fisher.", "profession": "fisher", "bio": ""}, + {"user_id": 1, "user_name": "ngo1", "name": "Ines NGO", + "persona": "I work for an environmental NGO.", "profession": "ngo_staff", "bio": ""}, + ] + p = tmp_path / "reddit_profiles.json" + p.write_text(json.dumps(data), encoding="utf-8") + return p + +def test_file_system_persona_provider_reads_reddit_json(tmp_path): + p = _write_reddit_profiles(tmp_path) + provider = FileSystemPersonaProvider(reddit_path=p, twitter_path=None) + personas = provider.all() + assert len(personas) == 2 + assert personas[0].name == "Fischer Müller" + assert personas[0].agent_id == 0 + +def test_zep_memory_provider_returns_empty_when_unavailable(): + class _BrokenReader: + def get_entity_with_context(self, *a, **kw): + raise RuntimeError("offline") + prov = ZepMemoryProvider(entity_reader=_BrokenReader(), graph_id="g1", + agent_to_entity={0: "uuid-zero"}) + d = prov.get_digest(0) + assert d.available is False + assert d.text != "" + +def test_zep_memory_provider_truncates_to_max_chars(): + class _R: + def get_entity_with_context(self, *a, **kw): + class _Ctx: + name = "X"; summary = "Y" + related_edges = [{"fact": "very long fact " * 200}] + return _Ctx() + prov = ZepMemoryProvider(entity_reader=_R(), graph_id="g1", + agent_to_entity={5: "uuid-five"}) + d = prov.get_digest(5, max_chars=300) + assert d.available is True + assert len(d.text) <= 300 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_adapters.py -v` +Expected: ImportError. + +- [ ] **Step 3: Implement adapters** + +`backend/app/services/interviews/adapters.py`: +```python +from __future__ import annotations +import csv +import json +from pathlib import Path +from typing import Optional +from app.services.interviews.base import PersonaRecord, MemoryDigest + +class FileSystemPersonaProvider: + """Reads OASIS profiles from the simulation's `reddit_profiles.json` and/or `twitter_profiles.csv`. + + If both are present, agents from `reddit_profiles.json` take precedence; twitter-only agents are appended. + """ + def __init__(self, reddit_path: Optional[Path], twitter_path: Optional[Path]): + self.reddit_path = Path(reddit_path) if reddit_path else None + self.twitter_path = Path(twitter_path) if twitter_path else None + + def _load_reddit(self) -> list[PersonaRecord]: + if not self.reddit_path or not self.reddit_path.exists(): return [] + data = json.loads(self.reddit_path.read_text(encoding="utf-8")) + out = [] + for row in data: + out.append(PersonaRecord( + agent_id=int(row.get("user_id")), + name=str(row.get("name") or row.get("user_name") or f"agent_{row.get('user_id')}"), + persona=str(row.get("persona") or row.get("bio") or ""), + profession=row.get("profession"), + bio=row.get("bio"), + )) + return out + + def _load_twitter(self) -> list[PersonaRecord]: + if not self.twitter_path or not self.twitter_path.exists(): return [] + out = [] + with self.twitter_path.open("r", encoding="utf-8", newline="") as f: + for row in csv.DictReader(f): + if not row.get("user_id"): continue + out.append(PersonaRecord( + agent_id=int(row["user_id"]), + name=str(row.get("name") or row.get("user_name") or f"agent_{row['user_id']}"), + persona=str(row.get("persona") or row.get("bio") or ""), + profession=row.get("profession"), + bio=row.get("bio"), + )) + return out + + def all(self) -> list[PersonaRecord]: + reddit = self._load_reddit() + seen = {p.agent_id for p in reddit} + twitter = [p for p in self._load_twitter() if p.agent_id not in seen] + return reddit + twitter + +class ZepMemoryProvider: + """Builds a bounded memory digest per agent from Zep entity context. + + Maps `agent_id` (OASIS user_id) to a Zep entity UUID; falls back to the agent_id as a string. + """ + def __init__(self, entity_reader, graph_id: str, agent_to_entity: dict[int, str] | None = None): + self.reader = entity_reader + self.graph_id = graph_id + self.map = dict(agent_to_entity or {}) + + def get_digest(self, agent_id: int, max_chars: int = 2000) -> MemoryDigest: + entity_uuid = self.map.get(agent_id) or str(agent_id) + try: + ctx = self.reader.get_entity_with_context(self.graph_id, entity_uuid) + except Exception: + return MemoryDigest(text=f"[no memory for agent {agent_id}]", available=False) + parts: list[str] = [] + name = getattr(ctx, "name", None) + summary = getattr(ctx, "summary", None) + if name: parts.append(f"Name: {name}") + if summary: parts.append(f"Summary: {summary}") + edges = getattr(ctx, "related_edges", []) or [] + for e in edges[:20]: + fact = e.get("fact") if isinstance(e, dict) else getattr(e, "fact", None) + if fact: parts.append(f"- {fact}") + text = "\n".join(parts) + if len(text) > max_chars: text = text[: max_chars - 1] + "…" + return MemoryDigest(text=text or f"[empty memory for agent {agent_id}]", available=True) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_adapters.py -v` +Expected: 3 passed. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/interviews/adapters.py backend/tests/interviews/test_adapters.py +git commit -m "feat(interviews): persona + Zep memory adapters bridging existing services to interview subsystem" +``` + +--- + +### Task 16: /api/interview Flask blueprint + +**Files:** +- Create: `backend/app/api/interview.py` +- Modify: `backend/app/api/__init__.py` +- Test: `backend/tests/interviews/test_api_interview.py` + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_api_interview.py +import json +import os +from pathlib import Path +import pytest + +@pytest.fixture +def client(tmp_path, monkeypatch): + monkeypatch.setenv("LLM_STUB_MODE", "true") + monkeypatch.setenv("UPLOADS_DIR", str(tmp_path)) + from app.config import Config + Config.LLM_STUB_MODE = True + Config.UPLOADS_DIR = str(tmp_path) + # Seed a minimal reddit_profiles.json + sim_dir = tmp_path / "simulations" / "sim_test" + sim_dir.mkdir(parents=True) + profiles = [{"user_id": i, "user_name": f"u{i}", "name": f"A{i}", + "persona": "p", "profession": "fisher"} for i in range(3)] + (sim_dir / "reddit_profiles.json").write_text(json.dumps(profiles), encoding="utf-8") + from flask import Flask + from app.api import register_blueprints + app = Flask(__name__) + register_blueprints(app) + return app.test_client() + +def test_post_pre_returns_task_id(client): + res = client.post("/api/interview/sim_test/pre") + assert res.status_code == 200 + body = res.get_json() + assert body["success"] is True + assert "task_id" in body["data"] + +def test_status_endpoint_returns_progress(client): + res = client.post("/api/interview/sim_test/pre") + task_id = res.get_json()["data"]["task_id"] + res2 = client.get(f"/api/interview/sim_test/status?task_id={task_id}") + assert res2.status_code == 200 + assert "status" in res2.get_json()["data"] + +def test_unknown_subagent_returns_400(client): + res = client.post("/api/interview/sim_test/rerun", + json={"subagent": "nonsense"}) + assert res.status_code == 400 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_api_interview.py -v` +Expected: ImportError / 404. + +- [ ] **Step 3: Check current `api/__init__.py`** + +Read `backend/app/api/__init__.py` and identify how `graph_bp`, `simulation_bp`, `report_bp` are registered. The test expects a `register_blueprints(app)` helper — if one doesn't exist, add it. + +- [ ] **Step 4: Modify `api/__init__.py`** + +Replace contents (preserving existing blueprint imports — adjust to match actual file): +```python +from flask import Flask +from .graph import graph_bp +from .simulation import simulation_bp +from .report import report_bp +from .interview import interview_bp + +def register_blueprints(app: Flask) -> None: + app.register_blueprint(graph_bp, url_prefix="/api/graph") + app.register_blueprint(simulation_bp, url_prefix="/api/simulation") + app.register_blueprint(report_bp, url_prefix="/api/report") + app.register_blueprint(interview_bp, url_prefix="/api/interview") +``` + +If the existing app factory in `app/__init__.py` already calls register manually, update it to call `register_blueprints(app)` instead. + +- [ ] **Step 5: Implement blueprint** + +`backend/app/api/interview.py`: +```python +from __future__ import annotations +import threading +import traceback +import uuid +from pathlib import Path +from flask import Blueprint, jsonify, request, send_file +from app.config import Config +from app.models.interview import SubagentKind, InterviewPhase +from app.services.interviews.adapters import FileSystemPersonaProvider, ZepMemoryProvider +from app.services.interviews.zep_writer import InterviewZepWriter +from app.services.interview_orchestrator import InterviewOrchestrator +from app.services.interview_synthesizer import InterviewSynthesizer +from app.services.interviews.storage import InterviewStore +from app.utils.llm_client import LLMClient + +interview_bp = Blueprint("interview", __name__) +_TASKS: dict[str, dict] = {} +_LOCK = threading.Lock() + +INSTRUMENT_DIR = Path(__file__).resolve().parents[2] / "scripts" / "instruments" + +def _uploads_root() -> Path: + return Path(getattr(Config, "UPLOADS_DIR", "uploads")) + +def _build_orchestrator(sim_id: str) -> InterviewOrchestrator: + sim_dir = _uploads_root() / "simulations" / sim_id + reddit = sim_dir / "reddit_profiles.json" + twitter = sim_dir / "twitter_profiles.csv" + personas = FileSystemPersonaProvider(reddit_path=reddit if reddit.exists() else None, + twitter_path=twitter if twitter.exists() else None) + # Zep memory + writer: best-effort; in stub/test mode the writer no-ops on exceptions + class _NullUpdater: + def add_text_episode(self, *a, **kw): return None + try: + from app.services.zep_entity_reader import ZepEntityReader + from app.services.zep_graph_memory_updater import ZepGraphMemoryUpdater + graph_id = (sim_dir / "graph_id.txt").read_text().strip() if (sim_dir / "graph_id.txt").exists() else "" + reader = ZepEntityReader() + updater = ZepGraphMemoryUpdater() + memory = ZepMemoryProvider(reader, graph_id=graph_id) + zep_writer = InterviewZepWriter(memory_updater=updater, graph_id=graph_id) + except Exception: + class _Mem: + def get_digest(self, agent_id, max_chars=2000): + from app.services.interviews.base import MemoryDigest + return MemoryDigest(text="[memory unavailable]", available=False) + memory = _Mem() + zep_writer = InterviewZepWriter(memory_updater=_NullUpdater(), graph_id="") + llm = LLMClient(api_key=Config.LLM_API_KEY, base_url=Config.LLM_BASE_URL, + model=Config.LLM_MODEL_NAME) + return InterviewOrchestrator( + llm=llm, memory=memory, personas=personas, + instrument_dir=INSTRUMENT_DIR, store_root=_uploads_root(), sim_id=sim_id, + zep_writer=zep_writer, max_workers=Config.INTERVIEW_MAX_WORKERS, + language=Config.INTERVIEW_DEFAULT_LANGUAGE, + ) + +def _run_task(task_id: str, fn) -> None: + with _LOCK: + _TASKS[task_id] = {"status": "running", "progress": {}, "result": None, "error": None} + try: + result = fn(task_id) + with _LOCK: + _TASKS[task_id]["status"] = "completed"; _TASKS[task_id]["result"] = result + except Exception as e: + with _LOCK: + _TASKS[task_id]["status"] = "failed" + _TASKS[task_id]["error"] = repr(e) + _TASKS[task_id]["traceback"] = traceback.format_exc() + +def _start_task(fn) -> str: + task_id = uuid.uuid4().hex[:12] + with _LOCK: + _TASKS[task_id] = {"status": "queued", "progress": {}, "result": None, "error": None} + threading.Thread(target=_run_task, args=(task_id, fn), daemon=True).start() + return task_id + +def _envelope(data=None, error=None, status: int = 200): + body = {"success": error is None, "data": data or {}, "error": error} + return jsonify(body), status + +@interview_bp.route("//pre", methods=["POST"]) +def post_pre(sim_id: str): + orch = _build_orchestrator(sim_id) + task_id = _start_task(lambda tid: orch.run_pre()) + return _envelope({"task_id": task_id}) + +@interview_bp.route("//post", methods=["POST"]) +def post_post(sim_id: str): + orch = _build_orchestrator(sim_id) + def run(tid): + out = orch.run_post() + synth = InterviewSynthesizer(store=orch.store) + out["synthesis"] = synth.run()[:1000] # short preview + return out + task_id = _start_task(run) + return _envelope({"task_id": task_id}) + +@interview_bp.route("//rerun", methods=["POST"]) +def post_rerun(sim_id: str): + body = request.get_json(silent=True) or {} + sub = body.get("subagent") + try: subagent = SubagentKind(sub) + except ValueError: return _envelope(error=f"unknown subagent {sub!r}", status=400) + orch = _build_orchestrator(sim_id) + task_id = _start_task(lambda tid: orch.rerun(subagent)) + return _envelope({"task_id": task_id}) + +@interview_bp.route("//status", methods=["GET"]) +def get_status(sim_id: str): + task_id = request.args.get("task_id") + with _LOCK: + task = _TASKS.get(task_id) + if task is None: return _envelope(error="unknown task_id", status=404) + return _envelope({"status": task["status"], "progress": task.get("progress", {}), + "result": task.get("result"), "error": task.get("error")}) + +@interview_bp.route("//results/", methods=["GET"]) +def get_results(sim_id: str, subagent: str): + try: sub = SubagentKind(subagent) + except ValueError: return _envelope(error=f"unknown subagent {subagent!r}", status=400) + store = InterviewStore(root=_uploads_root(), sim_id=sim_id) + phase = InterviewPhase.T1 if sub != SubagentKind.LONGITUDINAL else InterviewPhase.T1 + run = store.latest_run(phase, sub) + if run is None: return _envelope(error="no results yet", status=404) + agg = (run / "aggregate.json") + if not agg.exists(): return _envelope(error="aggregate missing", status=404) + import json as _j + return _envelope({"aggregate": _j.loads(agg.read_text(encoding="utf-8")), + "run_dir": str(run)}) + +@interview_bp.route("//results/synthesis", methods=["GET"]) +def get_synthesis(sim_id: str): + store = InterviewStore(root=_uploads_root(), sim_id=sim_id) + report = store.base / "synthesis" / "report.md" + if not report.exists(): + synth = InterviewSynthesizer(store=store) + synth.run() + return _envelope({"report_markdown": report.read_text(encoding="utf-8")}) + +@interview_bp.route("//export.csv", methods=["GET"]) +def get_export_csv(sim_id: str): + store = InterviewStore(root=_uploads_root(), sim_id=sim_id) + csv_path = store.base / "synthesis" / "exports" / "all_responses.csv" + if not csv_path.exists(): + InterviewSynthesizer(store=store).run() + return send_file(csv_path, mimetype="text/csv", as_attachment=True, + download_name=f"{sim_id}_interviews.csv") +``` + +- [ ] **Step 6: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_api_interview.py -v` +Expected: 3 passed. + +- [ ] **Step 7: Commit** + +```bash +git add backend/app/api/__init__.py backend/app/api/interview.py backend/tests/interviews/test_api_interview.py +git commit -m "feat(interviews): Flask blueprint /api/interview with task-based async + CSV export" +``` + +--- + +## Phase 6 — Integration + +### Task 17: End-to-end pipeline test (stub LLM) + +**Files:** +- Create: `backend/tests/integration/__init__.py` +- Test: `backend/tests/integration/test_interview_pipeline.py` + +- [ ] **Step 1: Write failing test** + +Create `backend/tests/integration/__init__.py` (empty), then: + +```python +# backend/tests/integration/test_interview_pipeline.py +import json +import pytest +from pathlib import Path +from app.config import Config +from app.models.interview import SubagentKind, InterviewPhase +from app.services.interviews.adapters import FileSystemPersonaProvider +from app.services.interviews.base import MemoryDigest +from app.services.interviews.zep_writer import InterviewZepWriter +from app.services.interview_orchestrator import InterviewOrchestrator +from app.services.interview_synthesizer import InterviewSynthesizer +from app.utils.llm_client import LLMClient + +pytestmark = pytest.mark.integration + +INST_DIR = Path(__file__).resolve().parents[2] / "scripts" / "instruments" + +class _NullUpdater: + def __init__(self): self.events = [] + def add_text_episode(self, graph_id, text): self.events.append(text) + +class _StaticMem: + def get_digest(self, agent_id, max_chars=2000): + return MemoryDigest(text=f"agent {agent_id} memory snippet", available=True) + +@pytest.fixture +def seeded_uploads(tmp_path, monkeypatch): + monkeypatch.setenv("LLM_STUB_MODE", "true") + Config.LLM_STUB_MODE = True + sim_dir = tmp_path / "simulations" / "intg_sim" + sim_dir.mkdir(parents=True) + profiles = [{"user_id": i, "user_name": f"u{i}", "name": f"A{i}", + "persona": "stakeholder p", "profession": "fisher"} for i in range(5)] + (sim_dir / "reddit_profiles.json").write_text(json.dumps(profiles), encoding="utf-8") + return tmp_path + +def _make_orch(tmp_path): + sim_dir = tmp_path / "simulations" / "intg_sim" + personas = FileSystemPersonaProvider( + reddit_path=sim_dir / "reddit_profiles.json", twitter_path=None, + ) + llm = LLMClient(api_key="x", base_url="x", model="x") + updater = _NullUpdater() + writer = InterviewZepWriter(memory_updater=updater, graph_id="g") + return InterviewOrchestrator( + llm=llm, memory=_StaticMem(), personas=personas, + instrument_dir=INST_DIR, store_root=tmp_path, sim_id="intg_sim", + zep_writer=writer, max_workers=2, language="de", + ) + +def test_pipeline_runs_pre_then_post_then_synthesis(seeded_uploads): + tmp = seeded_uploads + orch = _make_orch(tmp) + + pre = orch.run_pre() + assert pre["longitudinal"]["n_responded"] >= 1 + + post = orch.run_post() + assert "longitudinal" in post + assert "diversity" in post + assert "scenario" in post + assert "delphi" in post + + synth = InterviewSynthesizer(store=orch.store) + report = synth.run() + assert "Stakeholder Interview Synthesis" in report + assert "Limitations" in report + + csv_path = orch.store.base / "synthesis" / "exports" / "all_responses.csv" + assert csv_path.exists() + lines = csv_path.read_text().splitlines() + assert lines[0].startswith("agent_id,") or "agent_id" in lines[0] + +def test_idempotent_rerun_creates_new_run_id(seeded_uploads): + tmp = seeded_uploads + orch = _make_orch(tmp) + orch.run_pre() + first = orch.run_post() + second = orch.rerun(SubagentKind.SCENARIO) + first_scn = first["scenario"]["run_dir"] + second_scn = second["scenario"]["run_dir"] + assert first_scn != second_scn +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/integration/test_interview_pipeline.py -v -m integration` +Expected: most likely ValidationError from the stub LLM's canned JSON not satisfying every subagent's strict validator (forced Q-sort distribution, scenarios, Delphi). This is the signal to enrich the stub. + +- [ ] **Step 3: Enrich `_stub_response_json` in `LLMClient` to satisfy each subagent** + +Read the current `_stub_response_json` (Task 4). Replace its body with content-aware stubs by inspecting the user message text. In `backend/app/utils/llm_client.py`, replace `_stub_response_json` with: + +```python + def _stub_response_json(self, messages: list[dict]) -> dict: + import hashlib, json as _json + sys_msg = next((m["content"] for m in messages if m.get("role") == "system"), "") + usr_msg = next((m["content"] for m in reversed(messages) if m.get("role") == "user"), "") + h = hashlib.sha256((sys_msg + "|" + usr_msg).encode("utf-8")).hexdigest() + seed = int(h[:8], 16) + rng = (seed % 5) + 1 + + # Longitudinal Likert (12 items) + if all(tok in usr_msg for tok in ("stk_1", "gov_1", "mkt_1", "clm_1")): + ids = ["stk_1","stk_2","stk_3","gov_1","gov_2","gov_3", + "mkt_1","mkt_2","mkt_3","clm_1","clm_2","clm_3"] + return {"responses": {k: ((seed >> (i*3)) % 5) + 1 for i, k in enumerate(ids)}, + "confidence": {k: 0.6 for k in ids}, + "open_comment": f"stub:{h[:8]}"} + + # Diversity Q-sort: 24 statements + 6 axes, forced distribution 2,3,4,6,4,3,2 + if "st_01" in usr_msg and "ax_pres_extr" in usr_msg: + buckets = [-3]*2 + [-2]*3 + [-1]*4 + [0]*6 + [1]*4 + [2]*3 + [3]*2 + stmts = [f"st_{i+1:02d}" for i in range(24)] + # shuffle deterministically + order = sorted(range(24), key=lambda i: (h[i % len(h)], i)) + placements = {stmts[i]: buckets[order.index(i)] for i in range(24)} + return { + "placements": placements, + "likert_axes": {a: ((seed >> (j*3)) % 7) + 1 for j, a in enumerate( + ["ax_pres_extr","ax_loc_eu","ax_sci_trad", + "ax_ind_col","ax_short_long","ax_mkt_reg"])}, + } + + # Scenario: S1..S4 × 4 dims + if all(s in usr_msg for s in ("S1:", "S2:", "S3:", "S4:")): + return {"ratings": {sid: { + "desirability": ((seed >> (i*3)) % 7) + 1, + "plausibility": ((seed >> (i*3+1)) % 7) + 1, + "impact_on_my_group": ((seed >> (i*3+2)) % 7) + 1, + "fairness": ((seed >> (i*3+4)) % 7) + 1, + "if_woke_up_response": f"act-{sid}-{h[:4]}", + } for i, sid in enumerate(["S1","S2","S3","S4"])}} + + # Delphi R1: q1..q4 free text + if "q1" in usr_msg and "q2" in usr_msg and "Bewerten" not in usr_msg and "Sie sehen" not in usr_msg: + return {"answers": {qid: f"stub-themes-{qid}-{h[:4]}" for qid in ("q1","q2","q3","q4")}} + + # Delphi theme extraction (no in-character system prompt) + if "extract distinct thematic codes" in sys_msg: + return {"themes": [{"theme_id": f"theme_{i}", "label": f"Thema {i}"} for i in range(5)]} + + # Delphi R2 (rate) or R3 (revise) + if "Bewerten Sie jedes Thema" in usr_msg or "Sie sehen unten" in usr_msg \ + or "Rate each theme" in usr_msg or "Below are the anonymised" in usr_msg: + theme_ids = [f"theme_{i}" for i in range(5)] + out = {"ratings": {tid: {"importance": ((seed >> (i*2)) % 5) + 1, + "plausibility": ((seed >> (i*2+1)) % 5) + 1} + for i, tid in enumerate(theme_ids)}} + if "Sie sehen unten" in usr_msg or "Below are the anonymised" in usr_msg: + out["justification"] = "stub-revision" + return out + + # Fallback + return {"stub_key": h[:12], "value": rng} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/integration/test_interview_pipeline.py -v -m integration` +Expected: 2 passed. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/utils/llm_client.py backend/tests/integration/__init__.py backend/tests/integration/test_interview_pipeline.py +git commit -m "test(interviews): end-to-end pipeline test + content-aware LLM stubs for all 4 subagents" +``` + +--- + +## Phase 7 — Frontend + +Note: this project has no frontend test framework. Tasks below use the build (`npm run build`) plus a manual smoke check via `npm run dev` as the verification gate. Commit after each task once the build is green. + +### Task 18: Step4bInterviews.vue scaffold + tab shell + +**Files:** +- Create: `frontend/src/components/Step4bInterviews.vue` +- Create: `frontend/src/api/interview.js` +- Modify: `frontend/src/App.vue` (or the parent that orchestrates Step1..Step5 — locate and adjust) + +- [ ] **Step 1: Add API client module** + +`frontend/src/api/interview.js`: +```javascript +import { api } from "./index" + +export async function startPre(simId) { + const r = await api.post(`/api/interview/${simId}/pre`) + return r.data +} +export async function startPost(simId) { + const r = await api.post(`/api/interview/${simId}/post`) + return r.data +} +export async function rerun(simId, subagent) { + const r = await api.post(`/api/interview/${simId}/rerun`, { subagent }) + return r.data +} +export async function getStatus(simId, taskId) { + const r = await api.get(`/api/interview/${simId}/status`, { params: { task_id: taskId } }) + return r.data +} +export async function getResults(simId, subagent) { + const r = await api.get(`/api/interview/${simId}/results/${subagent}`) + return r.data +} +export async function getSynthesis(simId) { + const r = await api.get(`/api/interview/${simId}/results/synthesis`) + return r.data +} +export function exportCsvUrl(simId) { + return `/api/interview/${simId}/export.csv` +} +``` + +- [ ] **Step 2: Implement Step4bInterviews.vue scaffold** + +`frontend/src/components/Step4bInterviews.vue`: +```vue + + + + + +``` + +- [ ] **Step 3: Create placeholder panel components (to be filled in Task 19)** + +Create five empty-but-renderable Vue components so the scaffold compiles: + +`frontend/src/components/interviews/LongitudinalPanel.vue`: +```vue + + +``` + +Repeat the same pattern (changing only the inner text) for `DiversityPanel.vue`, `DelphiPanel.vue`, `ScenarioPanel.vue`, `SynthesisPanel.vue` in `frontend/src/components/interviews/`. + +- [ ] **Step 4: Wire Step4b into parent navigation** + +Read `frontend/src/App.vue` (or wherever Step1..Step5 are rendered). Locate the routing/visibility logic. Add a Step4b state between Step4 and Step5, and import `Step4bInterviews` from `./components/Step4bInterviews.vue`. Pass `:sim-id="currentSimId"` where the others receive the sim id. Add i18n keys to `locales/en.json`, `locales/de.json`, `locales/zh.json`: +```json +"interview": { + "title": "Stakeholder interviews", + "subtitle": "Four independent surveys of the simulated stakeholder population.", + "runAll": "Run all post-simulation interviews", + "downloadCsv": "Download CSV", + "tab": { + "longitudinal": "Longitudinal (Δ)", + "diversity": "Diversity", + "delphi": "Delphi", + "scenario": "Scenarios", + "synthesis": "Synthesis" + } +} +``` + +- [ ] **Step 5: Build to verify it compiles** + +Run: `cd frontend && npm run build` +Expected: build succeeds with no errors. + +- [ ] **Step 6: Commit** + +```bash +git add frontend/src/api/interview.js frontend/src/components/Step4bInterviews.vue \ + frontend/src/components/interviews/*.vue frontend/src/App.vue \ + locales/*.json +git commit -m "feat(interviews): Step4b Vue scaffold with five-tab navigation, API client, i18n keys" +``` + +--- + +### Task 19: Per-tab d3 visualisations + +**Files:** +- Modify: `frontend/src/components/interviews/LongitudinalPanel.vue` +- Modify: `frontend/src/components/interviews/DiversityPanel.vue` +- Modify: `frontend/src/components/interviews/DelphiPanel.vue` +- Modify: `frontend/src/components/interviews/ScenarioPanel.vue` +- Modify: `frontend/src/components/interviews/SynthesisPanel.vue` + +For each panel, fetch the relevant aggregate via the API on mount, then render with d3. The five implementations follow the same structure; each shows the full content below. + +- [ ] **Step 1: Longitudinal panel — heatmap of Δ̄ per item** + +`frontend/src/components/interviews/LongitudinalPanel.vue`: +```vue + + + + + +``` + +- [ ] **Step 2: Diversity panel — PCA scatter coloured by cluster** + +`frontend/src/components/interviews/DiversityPanel.vue`: +```vue + + + + + +``` + +- [ ] **Step 3: Delphi panel — convergence bar chart (R2 IQR vs R3 IQR per theme)** + +`frontend/src/components/interviews/DelphiPanel.vue`: +```vue + + + + + +``` + +- [ ] **Step 4: Scenario panel — polarity quadrant (desirability × plausibility)** + +`frontend/src/components/interviews/ScenarioPanel.vue`: +```vue + + + + + +``` + +- [ ] **Step 5: Synthesis panel — render markdown report** + +`frontend/src/components/interviews/SynthesisPanel.vue`: +```vue + + + + + +``` + +- [ ] **Step 6: Build + smoke test** + +Run: `cd frontend && npm run build` +Expected: build succeeds. Then `cd .. && npm run dev` and manually visit Step4b for a completed `sim_id` — verify all five tabs render without console errors. + +- [ ] **Step 7: Commit** + +```bash +git add frontend/src/components/interviews/*.vue +git commit -m "feat(interviews): d3 visualisations for longitudinal Δ, diversity PCA, Delphi, scenario polarity, synthesis" +``` + +--- + +### Task 20: Auto-trigger pre-survey on simulation `ready` + +**Files:** +- Create: `backend/app/services/interviews/lifecycle.py` +- Modify: `backend/app/__init__.py` (app factory) to install the hook + +- [ ] **Step 1: Write failing test** + +```python +# backend/tests/interviews/test_lifecycle.py +from app.services.interviews.lifecycle import install_hooks + +class _StubMgr: + def __init__(self): self.ready = []; self.completed = [] + def register_on_ready(self, fn): self.ready.append(fn) + def register_on_completed(self, fn): self.completed.append(fn) + +def test_install_hooks_registers_two_callables(): + mgr = _StubMgr() + install_hooks(mgr) + assert len(mgr.ready) == 1 + assert len(mgr.completed) == 1 + assert callable(mgr.ready[0]) + assert callable(mgr.completed[0]) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd backend && uv run pytest tests/interviews/test_lifecycle.py -v` +Expected: ImportError. + +- [ ] **Step 3: Implement lifecycle hook installer** + +`backend/app/services/interviews/lifecycle.py`: +```python +from __future__ import annotations +import threading +from app.utils.logger import get_logger + +logger = get_logger(__name__) + +def install_hooks(manager) -> None: + """Attach interview lifecycle callbacks to a SimulationManager. + + on_ready → spawn T0 longitudinal in a background thread + on_completed → spawn full post-sim batch in a background thread + Hooks are best-effort; failures only log. + """ + def _on_ready(state) -> None: + sim_id = getattr(state, "sim_id", None) or getattr(state, "id", None) + if not sim_id: return + threading.Thread(target=_run_pre, args=(sim_id,), daemon=True).start() + + def _on_completed(state) -> None: + sim_id = getattr(state, "sim_id", None) or getattr(state, "id", None) + if not sim_id: return + threading.Thread(target=_run_post, args=(sim_id,), daemon=True).start() + + manager.register_on_ready(_on_ready) + manager.register_on_completed(_on_completed) + +def _run_pre(sim_id: str) -> None: + try: + from app.api.interview import _build_orchestrator + orch = _build_orchestrator(sim_id) + orch.run_pre() + except Exception as e: + logger.warning(f"auto pre-survey failed for {sim_id}: {e!r}") + +def _run_post(sim_id: str) -> None: + try: + from app.api.interview import _build_orchestrator + from app.services.interview_synthesizer import InterviewSynthesizer + orch = _build_orchestrator(sim_id) + orch.run_post() + InterviewSynthesizer(store=orch.store).run() + except Exception as e: + logger.warning(f"auto post-survey failed for {sim_id}: {e!r}") +``` + +- [ ] **Step 4: Wire into app factory** + +Read `backend/app/__init__.py`. Locate where `SimulationManager` (or its singleton) is instantiated. Add: +```python + from app.services.interviews.lifecycle import install_hooks + install_hooks(simulation_manager) +``` +immediately after the manager is constructed. If `simulation_manager` is module-level in `simulation_manager.py`, attach the hooks at the bottom of that module instead — the goal is "install once on app startup". + +- [ ] **Step 5: Run test to verify it passes** + +Run: `cd backend && uv run pytest tests/interviews/test_lifecycle.py -v` +Expected: 1 passed. + +- [ ] **Step 6: Full backend test suite** + +Run: `cd backend && uv run pytest -m "not integration" -q` +Expected: all unit tests pass. + +Run: `cd backend && uv run pytest -m integration -q` +Expected: integration tests pass. + +- [ ] **Step 7: Commit** + +```bash +git add backend/app/services/interviews/lifecycle.py backend/app/__init__.py backend/tests/interviews/test_lifecycle.py +git commit -m "feat(interviews): auto-trigger pre and post interviews via SimulationManager lifecycle hooks" +``` + +--- + +## Final verification + +- [ ] **Run full backend test suite** + +Run: `cd backend && uv run pytest -q` +Expected: every test passes. + +- [ ] **Run frontend build** + +Run: `cd frontend && npm run build` +Expected: build succeeds with no errors. + +- [ ] **Smoke test the running app** + +Run: `npm run dev` from project root. With an existing completed simulation: +1. Navigate to Step4b in the UI +2. Click "Run all post-simulation interviews" +3. Wait for status to reach `completed` +4. Verify each of the five tabs renders without console errors +5. Click "Download CSV" and confirm the file downloads + +- [ ] **Verify spec coverage** + +Re-open `docs/superpowers/specs/2026-05-23-stakeholder-interview-subagents-design.md` and confirm every section in the spec has a corresponding task: + +- §3 architectural approach (deterministic runners) → Tasks 5–9 +- §4 file structure + lifecycle hooks → Tasks 2–14, 20 +- §5.1–5.4 four instruments → Tasks 6, 7, 8, 9 +- §5.5 in-character prompting + structured output + cost guardrails → Tasks 4, 5 +- §6.1 storage layout → Task 10 +- §6.2 Zep integration → Task 11 +- §6.3 API surface (all 7 endpoints) → Task 16 +- §6.4 parallelism + token guard → Task 12 (parallelism); token guard sits in `Config.INTERVIEW_MAX_TOKENS_PER_RUN` from Task 1 — *open: enforcement not implemented in v1; flag if you want it added* +- §6.5 frontend Step4b + per-tab viz → Tasks 18, 19 +- §7 error handling (per-agent isolation, schema retry, idempotency) → Tasks 5, 10, 12 +- §8 validation (schema, instrument, plausibility flags) → Tasks 2, 3 (schema + instrument); plausibility-flags currently sit inside synthesiser §10 — *check that flagged thresholds in §8 plausibility checks match what synthesiser currently emits* +- §9 testing (unit per subagent + integration + stub mode) → Tasks 4, 6–9, 12, 17 +- §10 methodological caveats in synthesis → Task 14 +- §11 defaults — already encoded in Task 1 config keys and instrument YAML + +If §6.4 token-guard enforcement is needed for v1, add a small follow-up task that computes a projected-token estimate before `run_post` and returns 400 with `confirm=true` override — but the spec keeps this as a guard, not a blocker, so it can ship in v1.1. + +--- + +**Plan complete and saved to `docs/superpowers/plans/2026-05-23-stakeholder-interview-subagents.md`. Two execution options:** + +**1. Subagent-Driven (recommended)** — I dispatch a fresh subagent per task, review between tasks, fast iteration. + +**2. Inline Execution** — Execute tasks in this session using executing-plans, batch execution with checkpoints. + +**Which approach?** +