From cca67365b928a10682c1899df347f6d9ace84f41 Mon Sep 17 00:00:00 2001 From: Christian Moellmann Date: Sat, 23 May 2026 12:24:33 +0200 Subject: [PATCH] feat(interviews): Zep writer adapts add_activity/add_text_episode for per-agent + aggregate episodes Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/app/services/interviews/zep_writer.py | 65 +++++++++++++++++++ backend/tests/interviews/test_zep_writer.py | 28 ++++++++ 2 files changed, 93 insertions(+) create mode 100644 backend/app/services/interviews/zep_writer.py create mode 100644 backend/tests/interviews/test_zep_writer.py diff --git a/backend/app/services/interviews/zep_writer.py b/backend/app/services/interviews/zep_writer.py new file mode 100644 index 00000000..c4b6e971 --- /dev/null +++ b/backend/app/services/interviews/zep_writer.py @@ -0,0 +1,65 @@ +from __future__ import annotations +from typing import Any, Optional +from app.models.interview import ( + LikertResponse, QSortResponse, DelphiRatingResponse, ScenarioResponse, SubagentKind, +) + +class InterviewZepWriter: + """Mirrors `ZepGraphMemoryUpdater.add_activity` usage but for interview episodes. + + The real `ZepGraphMemoryUpdater` may expose `add_activity` (preferred) or a lower-level + text-episode method; this writer adapts to either via duck typing. + """ + def __init__(self, memory_updater, graph_id: str): + self.updater = memory_updater + self.graph_id = graph_id + + def _emit(self, text: str) -> None: + if hasattr(self.updater, "add_text_episode"): + self.updater.add_text_episode(self.graph_id, text) + elif hasattr(self.updater, "add_activity"): + self.updater.add_activity({"graph_id": self.graph_id, "text": text}) + else: + raise RuntimeError("memory_updater has neither add_text_episode nor add_activity") + + def _summarize_likert(self, r: LikertResponse) -> str: + mean_v = sum(r.responses.values()) / max(len(r.responses), 1) + top = sorted(r.responses.items(), key=lambda kv: -kv[1])[:3] + bot = sorted(r.responses.items(), key=lambda kv: kv[1])[:3] + return (f"mean={mean_v:.2f}; agrees with {[k for k,_ in top]}; " + f"disagrees with {[k for k,_ in bot]}") + + def _summarize_qsort(self, r: QSortResponse) -> str: + plus = [k for k, v in r.placements.items() if v >= 2] + minus = [k for k, v in r.placements.items() if v <= -2] + return f"+strongly:{plus}; -strongly:{minus}" + + def _summarize_scenario(self, r: ScenarioResponse) -> str: + parts = [f"{sid}: des={rt.desirability} plaus={rt.plausibility}" + for sid, rt in r.ratings.items()] + return "; ".join(parts) + + def write_per_agent( + self, subagent: SubagentKind, response: Any, agent_name: str, + phase: Optional[str] = None, + ) -> None: + if isinstance(response, LikertResponse): + phase = phase or response.phase.value + summary = self._summarize_likert(response) + elif isinstance(response, QSortResponse): + phase = phase or "T1" + summary = self._summarize_qsort(response) + elif isinstance(response, ScenarioResponse): + phase = phase or "T1" + summary = self._summarize_scenario(response) + elif isinstance(response, DelphiRatingResponse): + phase = phase or f"T1/R{response.round}" + summary = f"round={response.round}; {len(response.ratings)} themes rated" + else: + phase = phase or "T1" + summary = str(response)[:200] + text = f"Agent {agent_name} (interview/{subagent.value}/{phase}): {summary}" + self._emit(text) + + def write_aggregate(self, subagent: SubagentKind, summary: str) -> None: + self._emit(f"Interview aggregate ({subagent.value}): {summary}") diff --git a/backend/tests/interviews/test_zep_writer.py b/backend/tests/interviews/test_zep_writer.py new file mode 100644 index 00000000..661ef44b --- /dev/null +++ b/backend/tests/interviews/test_zep_writer.py @@ -0,0 +1,28 @@ +from app.models.interview import ( + LikertResponse, InterviewPhase, SubagentKind, +) +from app.services.interviews.zep_writer import InterviewZepWriter + +class _FakeMemoryUpdater: + def __init__(self): + self.events = [] + def add_activity(self, activity): + self.events.append(activity) + def add_text_episode(self, graph_id, text): + self.events.append({"graph_id": graph_id, "text": text}) + +def test_per_agent_episode_text(): + upd = _FakeMemoryUpdater() + w = InterviewZepWriter(memory_updater=upd, graph_id="g1") + r = LikertResponse(agent_id=42, phase=InterviewPhase.T1, + responses={"stk_1": 4, "gov_1": 3}, + confidence={"stk_1": 0.8, "gov_1": 0.7}) + w.write_per_agent(SubagentKind.LONGITUDINAL, r, agent_name="Fischer Müller") + assert any("Fischer Müller" in str(e) for e in upd.events) + assert any("longitudinal/T1" in str(e) for e in upd.events) + +def test_aggregate_episode(): + upd = _FakeMemoryUpdater() + w = InterviewZepWriter(memory_updater=upd, graph_id="g1") + w.write_aggregate(SubagentKind.SCENARIO, summary="S1 mean desirability 5.2; S2 mean 2.1") + assert any("S1 mean" in str(e) for e in upd.events)