diff --git a/backend/app/services/interview_orchestrator.py b/backend/app/services/interview_orchestrator.py new file mode 100644 index 00000000..ff0d2ad8 --- /dev/null +++ b/backend/app/services/interview_orchestrator.py @@ -0,0 +1,217 @@ +from __future__ import annotations +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from typing import Protocol +from app.models.interview import ( + InterviewPhase, SubagentKind, LikertResponse, QSortResponse, + DelphiOpenResponse, DelphiRatingResponse, ScenarioResponse, +) +from app.services.interviews.base import PersonaRecord +from app.services.interviews.longitudinal import LongitudinalSubagent, run_aggregate as longitudinal_aggregate +from app.services.interviews.diversity import DiversitySubagent, run_typology +from app.services.interviews.delphi import ( + DelphiSubagent, extract_themes, convergence_metrics, group_stats_from_r2, +) +from app.services.interviews.scenario import ScenarioSubagent, polarity_matrix +from app.services.interviews.storage import InterviewStore +from app.services.interviews.instrument_loader import freeze_snapshot + + +class PersonaProvider(Protocol): + def all(self) -> list[PersonaRecord]: ... + + +class InterviewOrchestrator: + def __init__( + self, llm, memory, personas: PersonaProvider, + instrument_dir: Path, store_root: Path, sim_id: str, + zep_writer, max_workers: int = 8, language: str = "de", + ): + self.llm = llm + self.memory = memory + self.personas = personas + self.instrument_dir = Path(instrument_dir) + self.store = InterviewStore(root=store_root, sim_id=sim_id) + self.zep_writer = zep_writer + self.max_workers = max_workers + self.language = language + # Freeze snapshot once per orchestrator lifetime + freeze_snapshot( + instruments={ + "longitudinal": self.instrument_dir / "longitudinal_v1.yaml", + "diversity": self.instrument_dir / "diversity_v1.yaml", + "delphi": self.instrument_dir / "delphi_v1.yaml", + "scenario": self.instrument_dir / "scenario_v1.yaml", + }, + out_path=self.store.base / "instruments_used.json", + ) + + # --- Generic per-agent runner --- + def _fan_out(self, run_dir, agent_fn, personas, audit_label): + ok: list = [] + failed: list[int] = [] + with ThreadPoolExecutor(max_workers=self.max_workers) as pool: + futures = {pool.submit(agent_fn, p): p for p in personas} + for fut in as_completed(futures): + p = futures[fut] + try: + out = fut.result() + ok.append(out) + self.store.append_response(run_dir, out) + except Exception as e: + failed.append(p.agent_id) + self.store.audit(run_dir, agent_id=p.agent_id, + event="agent_failed", detail=f"{audit_label}: {e!r}") + return ok, failed + + # --- Pre-phase (T0) --- + def run_pre(self) -> dict: + sub = LongitudinalSubagent(self.llm, self.memory, + self.instrument_dir / "longitudinal_v1.yaml", + language=self.language) + run_dir = self.store.start_run(InterviewPhase.T0, SubagentKind.LONGITUDINAL) + ok, failed = self._fan_out( + run_dir, lambda p: sub.administer(p, phase=InterviewPhase.T0), + self.personas.all(), audit_label="longitudinal_T0", + ) + for r in ok: + persona = next(p for p in self.personas.all() if p.agent_id == r.agent_id) + try: self.zep_writer.write_per_agent(SubagentKind.LONGITUDINAL, r, persona.name) + except Exception: pass + self.store.mark_latest(run_dir) + return {"longitudinal": {"n_responded": len(ok), "n_failed": len(failed), + "run_dir": str(run_dir)}} + + # --- Post-phase (T1) --- + def run_post(self) -> dict: + personas = self.personas.all() + out: dict = {} + with ThreadPoolExecutor(max_workers=4) as pool: + futures = { + "longitudinal": pool.submit(self._post_longitudinal, personas), + "diversity": pool.submit(self._post_diversity, personas), + "scenario": pool.submit(self._post_scenario, personas), + } + for name, fut in futures.items(): + try: out[name] = fut.result() + except Exception as e: out[name] = {"error": repr(e)} + # Delphi runs sequentially (R1 → R2 → R3) and uses the LLM for theme extraction + try: out["delphi"] = self._post_delphi(personas) + except Exception as e: out["delphi"] = {"error": repr(e)} + return out + + def _post_longitudinal(self, personas) -> dict: + sub = LongitudinalSubagent(self.llm, self.memory, + self.instrument_dir / "longitudinal_v1.yaml", + language=self.language) + run_dir = self.store.start_run(InterviewPhase.T1, SubagentKind.LONGITUDINAL) + ok, failed = self._fan_out( + run_dir, lambda p: sub.administer(p, phase=InterviewPhase.T1), + personas, audit_label="longitudinal_T1", + ) + # Aggregate using T0 + T1 + t0_path = self.store.latest_run(InterviewPhase.T0, SubagentKind.LONGITUDINAL) + t0_raw = self.store.read_responses(t0_path) if t0_path else [] + t0 = [LikertResponse(**d) for d in t0_raw] + agg = longitudinal_aggregate(t0, ok) + self.store.write_aggregate(run_dir, agg) + for r in ok: + persona = next(p for p in personas if p.agent_id == r.agent_id) + try: self.zep_writer.write_per_agent(SubagentKind.LONGITUDINAL, r, persona.name) + except Exception: pass + try: self.zep_writer.write_aggregate(SubagentKind.LONGITUDINAL, + f"n_paired={agg['n_paired']}") + except Exception: pass + self.store.mark_latest(run_dir) + return {"n_responded": len(ok), "n_failed": len(failed), "run_dir": str(run_dir)} + + def _post_diversity(self, personas) -> dict: + sub = DiversitySubagent(self.llm, self.memory, + self.instrument_dir / "diversity_v1.yaml", + language=self.language) + run_dir = self.store.start_run(InterviewPhase.T1, SubagentKind.DIVERSITY) + ok, failed = self._fan_out( + run_dir, lambda p: sub.administer(p), personas, audit_label="diversity", + ) + typology = run_typology(ok) + self.store.write_named(run_dir, "typology.json", typology) + self.store.write_aggregate(run_dir, {"n": len(ok), "n_failed": len(failed), + "clusters": typology["clusters"]}) + for r in ok: + persona = next(p for p in personas if p.agent_id == r.agent_id) + try: self.zep_writer.write_per_agent(SubagentKind.DIVERSITY, r, persona.name) + except Exception: pass + self.store.mark_latest(run_dir) + return {"n_responded": len(ok), "n_failed": len(failed), "run_dir": str(run_dir)} + + def _post_scenario(self, personas) -> dict: + sub = ScenarioSubagent(self.llm, self.memory, + self.instrument_dir / "scenario_v1.yaml", + language=self.language) + run_dir = self.store.start_run(InterviewPhase.T1, SubagentKind.SCENARIO) + ok, failed = self._fan_out( + run_dir, lambda p: sub.administer(p), personas, audit_label="scenario", + ) + matrix = polarity_matrix(ok) + self.store.write_named(run_dir, "polarity_matrix.json", matrix) + self.store.write_aggregate(run_dir, {"n": len(ok), "n_failed": len(failed), + "polarity": matrix}) + for r in ok: + persona = next(p for p in personas if p.agent_id == r.agent_id) + try: self.zep_writer.write_per_agent(SubagentKind.SCENARIO, r, persona.name) + except Exception: pass + self.store.mark_latest(run_dir) + return {"n_responded": len(ok), "n_failed": len(failed), "run_dir": str(run_dir)} + + def _post_delphi(self, personas) -> dict: + sub = DelphiSubagent(self.llm, self.memory, + self.instrument_dir / "delphi_v1.yaml", + language=self.language) + run_dir = self.store.start_run(InterviewPhase.T1, SubagentKind.DELPHI) + # Round 1 + r1_ok, r1_failed = self._fan_out( + run_dir, lambda p: sub.administer_round1(p), personas, audit_label="delphi_r1", + ) + # Move all R1 responses into a dedicated file + for r in r1_ok: self.store.append_jsonl(run_dir, "round1_themes.jsonl", r) + # Extract themes from R1 + themes = extract_themes(r1_ok, llm=self.llm) + self.store.write_named(run_dir, "themes.json", {"themes": themes}) + # Round 2 + r2_ok, r2_failed = self._fan_out( + run_dir, lambda p: sub.administer_round2(p, themes), + [p for p in personas if p.agent_id in {r.agent_id for r in r1_ok}], + audit_label="delphi_r2", + ) + for r in r2_ok: self.store.append_jsonl(run_dir, "round2_ratings.jsonl", r) + gstats = group_stats_from_r2(r2_ok) + # Round 3 + r2_by = {r.agent_id: r for r in r2_ok} + r3_personas = [p for p in personas if p.agent_id in r2_by] + def r3_call(p): return sub.administer_round3(p, themes, gstats, r2_by[p.agent_id]) + r3_ok, r3_failed = self._fan_out(run_dir, r3_call, r3_personas, audit_label="delphi_r3") + for r in r3_ok: self.store.append_jsonl(run_dir, "round3_revisions.jsonl", r) + # Convergence + conv = convergence_metrics(r2_ok, r3_ok) + self.store.write_named(run_dir, "convergence.json", conv) + self.store.write_aggregate(run_dir, { + "n_r1": len(r1_ok), "n_r2": len(r2_ok), "n_r3": len(r3_ok), + "n_failed_r1": len(r1_failed), "n_failed_r2": len(r2_failed), "n_failed_r3": len(r3_failed), + "themes": themes, + }) + for r in r3_ok: + persona = next(p for p in personas if p.agent_id == r.agent_id) + try: self.zep_writer.write_per_agent(SubagentKind.DELPHI, r, persona.name) + except Exception: pass + self.store.mark_latest(run_dir) + return {"n_r1": len(r1_ok), "n_r2": len(r2_ok), "n_r3": len(r3_ok), + "run_dir": str(run_dir)} + + # --- Re-run a single subagent --- + def rerun(self, subagent: SubagentKind) -> dict: + personas = self.personas.all() + if subagent == SubagentKind.LONGITUDINAL: return {"longitudinal": self._post_longitudinal(personas)} + if subagent == SubagentKind.DIVERSITY: return {"diversity": self._post_diversity(personas)} + if subagent == SubagentKind.SCENARIO: return {"scenario": self._post_scenario(personas)} + if subagent == SubagentKind.DELPHI: return {"delphi": self._post_delphi(personas)} + raise ValueError(f"unknown subagent {subagent}") diff --git a/backend/tests/interviews/test_orchestrator.py b/backend/tests/interviews/test_orchestrator.py new file mode 100644 index 00000000..323c4361 --- /dev/null +++ b/backend/tests/interviews/test_orchestrator.py @@ -0,0 +1,64 @@ +from pathlib import Path +import pytest +from app.models.interview import InterviewPhase, SubagentKind +from app.services.interviews.base import PersonaRecord, MemoryDigest +from app.services.interview_orchestrator import ( + InterviewOrchestrator, PersonaProvider, +) + +INST_DIR = Path(__file__).resolve().parents[2] / "scripts" / "instruments" + +class _Mem: + def get_digest(self, agent_id, max_chars=2000): + return MemoryDigest(text="x", available=True) + +class _LLM: + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + sys_text = next((m["content"] for m in messages if m["role"] == "system"), "") + if "longitudinal" in sys_text or "stk_" in (messages[-1].get("content") or ""): + return { + "responses": {k: 3 for k in ("stk_1","stk_2","stk_3","gov_1","gov_2","gov_3", + "mkt_1","mkt_2","mkt_3","clm_1","clm_2","clm_3")}, + "confidence": {}, "open_comment": "ok", + } + return {} + +class _Personas(PersonaProvider): + def __init__(self, n=3): + self._items = [PersonaRecord(agent_id=i, name=f"A{i}", persona="p") for i in range(n)] + def all(self): return list(self._items) + +class _NoopZep: + def write_per_agent(self, *a, **kw): pass + def write_aggregate(self, *a, **kw): pass + +def test_pre_phase_runs_longitudinal_only(tmp_path): + orch = InterviewOrchestrator( + llm=_LLM(), memory=_Mem(), personas=_Personas(3), + instrument_dir=INST_DIR, store_root=tmp_path, sim_id="sim1", + zep_writer=_NoopZep(), max_workers=2, + ) + result = orch.run_pre() + assert result["longitudinal"]["n_responded"] == 3 + assert "diversity" not in result # only longitudinal in pre-phase + +def test_partial_failure_does_not_kill_run(tmp_path): + class _FlakyLLM: + def __init__(self): self.n = 0 + def chat_json(self, messages, temperature=0.0, max_tokens=None, **kw): + self.n += 1 + if self.n % 2 == 0: + raise RuntimeError("simulated LLM 5xx") + return { + "responses": {k: 3 for k in ("stk_1","stk_2","stk_3","gov_1","gov_2","gov_3", + "mkt_1","mkt_2","mkt_3","clm_1","clm_2","clm_3")}, + "confidence": {}, "open_comment": "ok", + } + orch = InterviewOrchestrator( + llm=_FlakyLLM(), memory=_Mem(), personas=_Personas(4), + instrument_dir=INST_DIR, store_root=tmp_path, sim_id="sim2", + zep_writer=_NoopZep(), max_workers=1, + ) + result = orch.run_pre() + assert result["longitudinal"]["n_responded"] < 4 + assert result["longitudinal"]["n_failed"] > 0