From 6e1489fe08c99ed5c49c151cf44ac66e7b4f05c6 Mon Sep 17 00:00:00 2001 From: Christian Moellmann Date: Sat, 23 May 2026 13:27:47 +0200 Subject: [PATCH] fix(interviews): wire Zep updater/memory/hooks correctly for production runs (C1-C5) Five tightly-coupled fixes that were causing the interview subsystem to silently degrade in production: - C1+C2: `_build_orchestrator` now resolves `graph_id` from `SimulationManager().get_simulation(sim_id).graph_id` (the real persisted state) instead of a `graph_id.txt` that nothing in the codebase writes. `ZepGraphMemoryUpdater(graph_id=...)` is now called with the correct positional argument; the bare `try/except Exception` that was swallowing the TypeError is replaced with a narrow fallback that logs explicitly. - C3: `SimulationManager._on_ready_hooks` / `_on_completed_hooks` are now class-level (mirroring `SimulationRunner._on_completed_callbacks`). Hooks registered at app startup now survive across the per-request `SimulationManager()` instances created by the Flask API, so the T0 longitudinal auto-survey actually fires. - C4: `ZepGraphMemoryUpdater` gains an explicit `add_text_episode(graph_id, text)` method for synchronous text writes. `InterviewZepWriter._emit` no longer silently falls back to a dict-shaped `add_activity` call that the real implementation rejects (its `add_activity` requires an `AgentActivity` dataclass). - C5: `FileSystemPersonaProvider.agent_to_entity()` builds an `{agent_id: zep_entity_uuid}` map from the persisted profile files; the map is now passed to `ZepMemoryProvider` so `get_entity_with_context` is called with real Zep UUIDs instead of `str(agent_id)`. To make this work, `OasisProfileGenerator._save_reddit_json` and `_save_twitter_csv` now persist `source_entity_uuid` (Reddit JSON: optional field; Twitter CSV: appended column). Tests: 51 unit + 2 integration pass (was 40 + 2). New tests lock in each fix: - `test_hooks_survive_across_instances` (C3) - `test_build_orchestrator_reads_graph_id_from_state` (C1+C2+C5) - `test_build_orchestrator_falls_back_when_state_missing` (C1+C2) - `test_emit_uses_add_text_episode_with_graph_id`, `test_emit_raises_when_updater_lacks_add_text_episode`, `test_real_updater_exposes_add_text_episode` (C4) - `test_agent_to_entity_from_reddit_json`, `test_agent_to_entity_empty_when_no_field`, `test_agent_to_entity_falls_back_to_twitter_csv`, `test_agent_to_entity_reddit_takes_precedence` (C5) Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/app/__init__.py | 15 +-- backend/app/api/interview.py | 102 +++++++++++++--- backend/app/services/interviews/adapters.py | 43 +++++++ backend/app/services/interviews/zep_writer.py | 15 ++- .../app/services/oasis_profile_generator.py | 29 +++-- backend/app/services/simulation_manager.py | 47 +++++--- .../app/services/zep_graph_memory_updater.py | 38 ++++++ backend/tests/interviews/test_adapters.py | 75 ++++++++++++ .../tests/interviews/test_api_interview.py | 113 ++++++++++++++++++ .../tests/interviews/test_simulation_hooks.py | 56 +++++++++ backend/tests/interviews/test_zep_writer.py | 55 ++++++++- 11 files changed, 526 insertions(+), 62 deletions(-) diff --git a/backend/app/__init__.py b/backend/app/__init__.py index d3a6d543..fdc49112 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -48,16 +48,17 @@ def create_app(config_class=Config): if should_log_startup: logger.info("已注册模拟进程清理函数") - # Install interview lifecycle hooks on a singleton SimulationManager. - # The singleton's _notify_on_completed is also wired into SimulationRunner - # so that the runner's monitor thread fires the completed hooks when a - # simulation process exits successfully. + # Install interview lifecycle hooks on the SimulationManager class. + # Hooks are stored on the class itself (not on a particular instance), so + # any fresh `SimulationManager()` constructed later (e.g. per request in + # the Flask API) will see them. We still bridge `_notify_on_completed` + # into SimulationRunner via a transient instance so the runner's monitor + # thread fires the completed hooks when a simulation process exits. from .services.simulation_manager import SimulationManager from .services.interviews.lifecycle import install_hooks - _simulation_manager_singleton = SimulationManager() - install_hooks(_simulation_manager_singleton) - SimulationRunner.register_on_completed(_simulation_manager_singleton._notify_on_completed) + install_hooks(SimulationManager) + SimulationRunner.register_on_completed(SimulationManager()._notify_on_completed) if should_log_startup: logger.info("已安装面试生命周期钩子") diff --git a/backend/app/api/interview.py b/backend/app/api/interview.py index 993fda17..e638aaab 100644 --- a/backend/app/api/interview.py +++ b/backend/app/api/interview.py @@ -12,9 +12,31 @@ from app.services.interview_orchestrator import InterviewOrchestrator from app.services.interview_synthesizer import InterviewSynthesizer from app.services.interviews.storage import InterviewStore from app.utils.llm_client import LLMClient +from app.utils.logger import get_logger from . import interview_bp +logger = get_logger(__name__) + + +class _NullUpdater: + """No-op stand-in for ``ZepGraphMemoryUpdater`` used when Zep is unavailable. + + Exposes ``add_text_episode`` so ``InterviewZepWriter._emit`` succeeds silently — + the interview pipeline still produces local artefacts; Zep just isn't updated. + """ + + def add_text_episode(self, graph_id, text): # noqa: ARG002 - matches real API + return None + + +class _NullMemory: + """Fallback memory provider that always reports unavailable digests.""" + + def get_digest(self, agent_id, max_chars=2000): # noqa: ARG002 - matches Protocol + from app.services.interviews.base import MemoryDigest + return MemoryDigest(text="[memory unavailable]", available=False) + _TASKS: dict[str, dict] = {} _LOCK = threading.Lock() @@ -25,30 +47,72 @@ def _uploads_root() -> Path: return Path(getattr(Config, "UPLOADS_DIR", "uploads")) +def _load_graph_id(sim_id: str) -> str: + """Read the Zep ``graph_id`` for a simulation from its persisted state. + + The graph_id is written by ``SimulationManager`` into + ``uploads/simulations/{sim_id}/state.json``. Returns ``""`` if the state + file is missing or unreadable — callers should treat empty graph_id as + "Zep unavailable" and fall back to the null memory/writer path. + """ + try: + from app.services.simulation_manager import SimulationManager + state = SimulationManager().get_simulation(sim_id) + if state and state.graph_id: + return state.graph_id + except Exception as e: # pragma: no cover - defensive + logger.warning(f"_load_graph_id({sim_id}) failed: {e!r}") + return "" + + def _build_orchestrator(sim_id: str) -> InterviewOrchestrator: sim_dir = _uploads_root() / "simulations" / sim_id reddit = sim_dir / "reddit_profiles.json" twitter = sim_dir / "twitter_profiles.csv" - personas = FileSystemPersonaProvider(reddit_path=reddit if reddit.exists() else None, - twitter_path=twitter if twitter.exists() else None) - # Zep memory + writer: best-effort; in stub/test mode the writer no-ops on exceptions - class _NullUpdater: - def add_text_episode(self, *a, **kw): return None - try: - from app.services.zep_entity_reader import ZepEntityReader - from app.services.zep_graph_memory_updater import ZepGraphMemoryUpdater - graph_id = (sim_dir / "graph_id.txt").read_text().strip() if (sim_dir / "graph_id.txt").exists() else "" - reader = ZepEntityReader() - updater = ZepGraphMemoryUpdater() - memory = ZepMemoryProvider(reader, graph_id=graph_id) - zep_writer = InterviewZepWriter(memory_updater=updater, graph_id=graph_id) - except Exception: - class _Mem: - def get_digest(self, agent_id, max_chars=2000): - from app.services.interviews.base import MemoryDigest - return MemoryDigest(text="[memory unavailable]", available=False) - memory = _Mem() + personas = FileSystemPersonaProvider( + reddit_path=reddit if reddit.exists() else None, + twitter_path=twitter if twitter.exists() else None, + ) + # Build agent_id -> Zep entity uuid map from the persisted profile files. + agent_to_entity = personas.agent_to_entity() + + # Resolve the graph_id from the simulation's persisted state — NOT from a + # ``graph_id.txt`` (nothing in the codebase writes such a file). + graph_id = _load_graph_id(sim_id) + + memory: object + zep_writer: InterviewZepWriter + if not graph_id: + logger.warning( + f"interview: no graph_id for sim {sim_id} — Zep memory/writer disabled " + "(simulation state missing or graph_id empty)" + ) + memory = _NullMemory() zep_writer = InterviewZepWriter(memory_updater=_NullUpdater(), graph_id="") + else: + try: + from app.services.zep_entity_reader import ZepEntityReader + from app.services.zep_graph_memory_updater import ZepGraphMemoryUpdater + + reader = ZepEntityReader() + updater = ZepGraphMemoryUpdater(graph_id=graph_id) + memory = ZepMemoryProvider( + reader, graph_id=graph_id, agent_to_entity=agent_to_entity + ) + zep_writer = InterviewZepWriter(memory_updater=updater, graph_id=graph_id) + if not agent_to_entity: + logger.warning( + f"interview: empty agent_to_entity map for sim {sim_id} — " + "memory digests will be unavailable. Check that profile files " + "include `source_entity_uuid`." + ) + except Exception as e: + logger.warning( + f"interview: Zep init failed for sim {sim_id} ({e!r}); " + "falling back to null memory/writer" + ) + memory = _NullMemory() + zep_writer = InterviewZepWriter(memory_updater=_NullUpdater(), graph_id="") llm = LLMClient(api_key=Config.LLM_API_KEY, base_url=Config.LLM_BASE_URL, model=Config.LLM_MODEL_NAME) return InterviewOrchestrator( diff --git a/backend/app/services/interviews/adapters.py b/backend/app/services/interviews/adapters.py index 94431fe9..06d05e94 100644 --- a/backend/app/services/interviews/adapters.py +++ b/backend/app/services/interviews/adapters.py @@ -54,6 +54,49 @@ class FileSystemPersonaProvider: twitter = [p for p in self._load_twitter() if p.agent_id not in seen] return reddit + twitter + def agent_to_entity(self) -> dict[int, str]: + """Build the ``{agent_id: zep_entity_uuid}`` map from the persisted profile files. + + Both writers (``oasis_profile_generator._save_reddit_json`` and + ``_save_twitter_csv``) emit ``source_entity_uuid`` per agent. Reddit takes + precedence; rows with a missing/blank uuid are skipped. + Returns an empty dict if neither file is present or no row has the field. + """ + mapping: dict[int, str] = {} + + # Reddit JSON + if self.reddit_path and self.reddit_path.exists(): + try: + rows = json.loads(self.reddit_path.read_text(encoding="utf-8")) + for row in rows: + uid = row.get("user_id") + uuid_ = row.get("source_entity_uuid") + if uid is None or not uuid_: + continue + mapping[int(uid)] = str(uuid_) + except (json.JSONDecodeError, ValueError, TypeError): + pass + + # Twitter CSV (only fills agents not already mapped) + if self.twitter_path and self.twitter_path.exists(): + try: + with self.twitter_path.open("r", encoding="utf-8", newline="") as f: + for row in csv.DictReader(f): + uid = row.get("user_id") + uuid_ = row.get("source_entity_uuid") + if not uid or not uuid_: + continue + try: + uid_int = int(uid) + except (TypeError, ValueError): + continue + if uid_int not in mapping: + mapping[uid_int] = str(uuid_) + except OSError: + pass + + return mapping + class ZepMemoryProvider: """Builds a bounded memory digest per agent from Zep entity context. diff --git a/backend/app/services/interviews/zep_writer.py b/backend/app/services/interviews/zep_writer.py index c4b6e971..fdd9f185 100644 --- a/backend/app/services/interviews/zep_writer.py +++ b/backend/app/services/interviews/zep_writer.py @@ -5,10 +5,12 @@ from app.models.interview import ( ) class InterviewZepWriter: - """Mirrors `ZepGraphMemoryUpdater.add_activity` usage but for interview episodes. + """Writes interview episodes (per-agent responses, aggregates) to a Zep graph. - The real `ZepGraphMemoryUpdater` may expose `add_activity` (preferred) or a lower-level - text-episode method; this writer adapts to either via duck typing. + Expects ``memory_updater`` to expose ``add_text_episode(graph_id, text)`` — that + is the method the real ``ZepGraphMemoryUpdater`` provides for synchronous text + writes outside the agent-activity batch pipeline. A no-op shim with the same + method is acceptable for tests and stub mode. """ def __init__(self, memory_updater, graph_id: str): self.updater = memory_updater @@ -17,10 +19,11 @@ class InterviewZepWriter: def _emit(self, text: str) -> None: if hasattr(self.updater, "add_text_episode"): self.updater.add_text_episode(self.graph_id, text) - elif hasattr(self.updater, "add_activity"): - self.updater.add_activity({"graph_id": self.graph_id, "text": text}) else: - raise RuntimeError("memory_updater has neither add_text_episode nor add_activity") + raise RuntimeError( + "memory_updater is missing add_text_episode(graph_id, text); " + "InterviewZepWriter requires the explicit text-episode API." + ) def _summarize_likert(self, r: LikertResponse) -> str: mean_v = sum(r.responses.values()) / max(len(r.responses), 1) diff --git a/backend/app/services/oasis_profile_generator.py b/backend/app/services/oasis_profile_generator.py index 7704a627..9360e18c 100644 --- a/backend/app/services/oasis_profile_generator.py +++ b/backend/app/services/oasis_profile_generator.py @@ -1090,11 +1090,13 @@ class OasisProfileGenerator: with open(file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) - - # 写入OASIS要求的表头 - headers = ['user_id', 'name', 'username', 'user_char', 'description'] + + # 写入表头:OASIS要求的5列 + 额外的source_entity_uuid列(反向链接到Zep实体)。 + # OASIS按列名读取,额外的列不会影响其行为,但允许下游(面试子系统等) + # 重建 agent_id -> Zep entity uuid 的映射。 + headers = ['user_id', 'name', 'username', 'user_char', 'description', 'source_entity_uuid'] writer.writerow(headers) - + # 写入数据行 for idx, profile in enumerate(profiles): # user_char: 完整人设(bio + persona),用于LLM系统提示 @@ -1103,16 +1105,17 @@ class OasisProfileGenerator: user_char = f"{profile.bio} {profile.persona}" # 处理换行符(CSV中用空格替代) user_char = user_char.replace('\n', ' ').replace('\r', ' ') - + # description: 简短简介,用于外部显示 description = profile.bio.replace('\n', ' ').replace('\r', ' ') - + row = [ idx, # user_id: 从0开始的顺序ID profile.name, # name: 真实姓名 profile.user_name, # username: 用户名 user_char, # user_char: 完整人设(内部LLM使用) - description # description: 简短简介(外部显示) + description, # description: 简短简介(外部显示) + profile.source_entity_uuid or "", # source_entity_uuid: Zep实体UUID ] writer.writerow(row) @@ -1184,12 +1187,18 @@ class OasisProfileGenerator: item["profession"] = profile.profession if profile.interested_topics: item["interested_topics"] = profile.interested_topics - + # source_entity_uuid: 反向链接到Zep实体,下游(面试子系统等)需要此映射以 + # 在Zep图谱中查找Agent的上下文。仅在存在时写入。 + if profile.source_entity_uuid: + item["source_entity_uuid"] = profile.source_entity_uuid + if profile.source_entity_type: + item["source_entity_type"] = profile.source_entity_type + data.append(item) - + with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) - + logger.info(f"已保存 {len(profiles)} 个Reddit Profile到 {file_path} (JSON格式,包含user_id字段)") # 保留旧方法名作为别名,保持向后兼容 diff --git a/backend/app/services/simulation_manager.py b/backend/app/services/simulation_manager.py index 5fe962f6..50b7890a 100644 --- a/backend/app/services/simulation_manager.py +++ b/backend/app/services/simulation_manager.py @@ -115,30 +115,33 @@ class SimulationState: class SimulationManager: """ 模拟管理器 - + 核心功能: 1. 从Zep图谱读取实体并过滤 2. 生成OASIS Agent Profile 3. 使用LLM智能生成模拟配置参数 4. 准备预设脚本所需的所有文件 """ - + # 模拟数据存储目录 SIMULATION_DATA_DIR = os.path.join( - os.path.dirname(__file__), + os.path.dirname(__file__), '../../uploads/simulations' ) - + + # Class-level hook registries so callbacks survive across instances. + # The Flask API endpoints construct fresh `SimulationManager()` instances per request, + # while lifecycle hooks are registered once at app startup — storing the lists on the + # instance would silently drop those hooks on every request. + _on_ready_hooks: list = [] + _on_completed_hooks: list = [] + def __init__(self): # 确保目录存在 os.makedirs(self.SIMULATION_DATA_DIR, exist_ok=True) # 内存中的模拟状态缓存 self._simulations: Dict[str, SimulationState] = {} - - # Lifecycle hook registries - self._on_ready_hooks: list = [] - self._on_completed_hooks: list = [] def _get_simulation_dir(self, simulation_id: str) -> str: """获取模拟数据目录""" @@ -196,20 +199,30 @@ class SimulationManager: return state # ------------------------------------------------------------------ - # Lifecycle hook registration + # Lifecycle hook registration (class-level — see class docstring) # ------------------------------------------------------------------ - def register_on_ready(self, fn) -> None: - """Register a callback invoked when a simulation transitions to READY.""" - self._on_ready_hooks.append(fn) + @classmethod + def register_on_ready(cls, fn) -> None: + """Register a callback invoked when a simulation transitions to READY. - def register_on_completed(self, fn) -> None: - """Register a callback invoked when a simulation transitions to COMPLETED.""" - self._on_completed_hooks.append(fn) + Class-level so hooks registered at app startup remain visible to every + SimulationManager() instance constructed later (e.g. per-request in Flask). + """ + cls._on_ready_hooks.append(fn) + + @classmethod + def register_on_completed(cls, fn) -> None: + """Register a callback invoked when a simulation transitions to COMPLETED. + + Class-level so hooks registered at app startup remain visible to every + SimulationManager() instance constructed later (e.g. per-request in Flask). + """ + cls._on_completed_hooks.append(fn) def _notify_on_ready(self, state: "SimulationState") -> None: """Invoke all on_ready hooks; exceptions are isolated per hook.""" - for fn in list(self._on_ready_hooks): + for fn in list(type(self)._on_ready_hooks): try: fn(state) except Exception as e: @@ -217,7 +230,7 @@ class SimulationManager: def _notify_on_completed(self, state: "SimulationState") -> None: """Invoke all on_completed hooks; exceptions are isolated per hook.""" - for fn in list(self._on_completed_hooks): + for fn in list(type(self)._on_completed_hooks): try: fn(state) except Exception as e: diff --git a/backend/app/services/zep_graph_memory_updater.py b/backend/app/services/zep_graph_memory_updater.py index e034fee2..86a4e1e2 100644 --- a/backend/app/services/zep_graph_memory_updater.py +++ b/backend/app/services/zep_graph_memory_updater.py @@ -337,6 +337,44 @@ class ZepGraphMemoryUpdater: self._total_activities += 1 logger.debug(f"添加活动到Zep队列: {activity.agent_name} - {activity.action_type}") + def add_text_episode(self, graph_id: str, text: str) -> None: + """ + 直接将一段文本写入Zep图谱(同步发送,不经过批量队列) + + 用于面试子系统(InterviewZepWriter)等需要立即写入、不属于 + agent活动流水线的场景。绕过 _send_batch_activities 的批量逻辑, + 但仍带重试。 + + Args: + graph_id: 目标图谱ID(允许覆盖 self.graph_id,便于多图场景) + text: 要发送的文本内容 + """ + if not text: + return + target_graph_id = graph_id or self.graph_id + if not target_graph_id: + logger.warning("add_text_episode 调用时未指定graph_id,跳过") + return + + for attempt in range(self.MAX_RETRIES): + try: + self.client.graph.add( + graph_id=target_graph_id, + type="text", + data=text, + ) + self._total_sent += 1 + self._total_items_sent += 1 + logger.debug(f"add_text_episode 发送成功 (graph={target_graph_id}, len={len(text)})") + return + except Exception as e: + if attempt < self.MAX_RETRIES - 1: + logger.warning(f"add_text_episode 失败 (尝试 {attempt + 1}/{self.MAX_RETRIES}): {e}") + time.sleep(self.RETRY_DELAY * (attempt + 1)) + else: + logger.error(f"add_text_episode 失败,已重试{self.MAX_RETRIES}次: {e}") + self._failed_count += 1 + def add_activity_from_dict(self, data: Dict[str, Any], platform: str): """ 从字典数据添加活动 diff --git a/backend/tests/interviews/test_adapters.py b/backend/tests/interviews/test_adapters.py index ab7dee2e..977d5997 100644 --- a/backend/tests/interviews/test_adapters.py +++ b/backend/tests/interviews/test_adapters.py @@ -46,3 +46,78 @@ def test_zep_memory_provider_truncates_to_max_chars(): d = prov.get_digest(5, max_chars=300) assert d.available is True assert len(d.text) <= 300 + + +def test_agent_to_entity_from_reddit_json(tmp_path): + """C5: ``FileSystemPersonaProvider.agent_to_entity()`` must reconstruct the + ``{agent_id: zep_entity_uuid}`` map from a reddit_profiles.json that + includes ``source_entity_uuid``. + """ + data = [ + {"user_id": 0, "user_name": "fischer1", "name": "Fischer Müller", + "persona": "p", "profession": "fisher", + "source_entity_uuid": "uuid-zero"}, + {"user_id": 1, "user_name": "ngo1", "name": "Ines NGO", + "persona": "p", "profession": "ngo_staff", + "source_entity_uuid": "uuid-one"}, + # Row with no uuid must be skipped. + {"user_id": 2, "user_name": "gov1", "name": "Gov Agent", + "persona": "p", "profession": "official"}, + ] + p = tmp_path / "reddit_profiles.json" + p.write_text(json.dumps(data), encoding="utf-8") + + provider = FileSystemPersonaProvider(reddit_path=p, twitter_path=None) + mapping = provider.agent_to_entity() + + assert mapping == {0: "uuid-zero", 1: "uuid-one"} + # Map values are strings, keys are ints. + for k, v in mapping.items(): + assert isinstance(k, int) + assert isinstance(v, str) + + +def test_agent_to_entity_empty_when_no_field(tmp_path): + """C5: if no row has ``source_entity_uuid``, return an empty dict — not + a crash, not partial garbage.""" + data = [{"user_id": 0, "user_name": "u", "name": "A", "persona": "p"}] + p = tmp_path / "reddit_profiles.json" + p.write_text(json.dumps(data), encoding="utf-8") + provider = FileSystemPersonaProvider(reddit_path=p, twitter_path=None) + assert provider.agent_to_entity() == {} + + +def test_agent_to_entity_falls_back_to_twitter_csv(tmp_path): + """C5: when only twitter_profiles.csv exists, the helper must still + extract uuids from the CSV's ``source_entity_uuid`` column. + """ + p = tmp_path / "twitter_profiles.csv" + with p.open("w", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + writer.writerow(["user_id", "name", "username", "user_char", "description", "source_entity_uuid"]) + writer.writerow([0, "A0", "u0", "char", "desc", "uuid-zero"]) + writer.writerow([1, "A1", "u1", "char", "desc", ""]) # skipped (blank uuid) + writer.writerow([2, "A2", "u2", "char", "desc", "uuid-two"]) + + provider = FileSystemPersonaProvider(reddit_path=None, twitter_path=p) + assert provider.agent_to_entity() == {0: "uuid-zero", 2: "uuid-two"} + + +def test_agent_to_entity_reddit_takes_precedence(tmp_path): + """C5: when both files exist, Reddit JSON wins; Twitter CSV only fills + agents not already mapped.""" + reddit = tmp_path / "reddit_profiles.json" + reddit.write_text(json.dumps([ + {"user_id": 0, "user_name": "u0", "name": "A0", "persona": "p", + "source_entity_uuid": "reddit-zero"}, + ]), encoding="utf-8") + + twitter = tmp_path / "twitter_profiles.csv" + with twitter.open("w", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + writer.writerow(["user_id", "name", "username", "user_char", "description", "source_entity_uuid"]) + writer.writerow([0, "A0", "u0", "char", "desc", "twitter-zero"]) # ignored + writer.writerow([1, "A1", "u1", "char", "desc", "twitter-one"]) # used + + provider = FileSystemPersonaProvider(reddit_path=reddit, twitter_path=twitter) + assert provider.agent_to_entity() == {0: "reddit-zero", 1: "twitter-one"} diff --git a/backend/tests/interviews/test_api_interview.py b/backend/tests/interviews/test_api_interview.py index baad634b..7e55d627 100644 --- a/backend/tests/interviews/test_api_interview.py +++ b/backend/tests/interviews/test_api_interview.py @@ -40,3 +40,116 @@ def test_unknown_subagent_returns_400(client): res = client.post("/api/interview/sim_test/rerun", json={"subagent": "nonsense"}) assert res.status_code == 400 + + +def test_build_orchestrator_reads_graph_id_from_state(tmp_path, monkeypatch): + """C1+C2: ``_build_orchestrator`` must resolve the Zep graph_id from + ``state.json`` (written by ``SimulationManager``), not from the + nonexistent ``graph_id.txt``. The graph_id then must reach the + ``InterviewZepWriter`` instead of being silently swallowed. + """ + monkeypatch.setenv("LLM_STUB_MODE", "true") + monkeypatch.setenv("UPLOADS_DIR", str(tmp_path)) + monkeypatch.setenv("ZEP_API_KEY", "test-fake-key") + from app.config import Config + Config.LLM_STUB_MODE = True + Config.UPLOADS_DIR = str(tmp_path) + Config.ZEP_API_KEY = "test-fake-key" + + # SimulationManager's data dir is class-level — point it at tmp_path. + from app.services.simulation_manager import SimulationManager + sim_root = tmp_path / "simulations" + sim_root.mkdir(parents=True, exist_ok=True) + monkeypatch.setattr(SimulationManager, "SIMULATION_DATA_DIR", str(sim_root)) + + sim_id = "sim_graphid" + sim_dir = sim_root / sim_id + sim_dir.mkdir(parents=True) + # Seed a profile file so FileSystemPersonaProvider can work. + (sim_dir / "reddit_profiles.json").write_text( + json.dumps([ + {"user_id": 0, "user_name": "u0", "name": "A0", + "persona": "p", "profession": "fisher", + "source_entity_uuid": "uuid-zero"}, + {"user_id": 1, "user_name": "u1", "name": "A1", + "persona": "p", "profession": "fisher", + "source_entity_uuid": "uuid-one"}, + ]), + encoding="utf-8", + ) + # Seed state.json with the graph_id. + state_doc = { + "simulation_id": sim_id, + "project_id": "p", + "graph_id": "graph-from-state", + "status": "ready", + "enable_twitter": False, + "enable_reddit": True, + } + (sim_dir / "state.json").write_text(json.dumps(state_doc), encoding="utf-8") + + # Patch ZepGraphMemoryUpdater + ZepEntityReader so we don't hit the network. + import app.services.zep_graph_memory_updater as zgmu + import app.services.zep_entity_reader as zer + + class _FakeUpdater: + def __init__(self, graph_id, api_key=None): + self.graph_id = graph_id + + def add_text_episode(self, graph_id, text): + return None + + class _FakeReader: + def __init__(self, api_key=None): + pass + + def get_entity_with_context(self, graph_id, entity_uuid): + return None + + monkeypatch.setattr(zgmu, "ZepGraphMemoryUpdater", _FakeUpdater) + monkeypatch.setattr(zer, "ZepEntityReader", _FakeReader) + + from app.api.interview import _build_orchestrator + + orch = _build_orchestrator(sim_id) + assert orch.zep_writer.graph_id == "graph-from-state" + # Updater on the writer must be the real (or fake) ZepGraphMemoryUpdater path, + # NOT the null updater — i.e. its graph_id must match. + assert getattr(orch.zep_writer.updater, "graph_id", None) == "graph-from-state" + + # ZepMemoryProvider must have received the agent_to_entity map (C5). + assert hasattr(orch.memory, "map") + assert orch.memory.map == {0: "uuid-zero", 1: "uuid-one"} + + +def test_build_orchestrator_falls_back_when_state_missing(tmp_path, monkeypatch): + """C1+C2: when ``state.json`` is missing, the orchestrator must still be + constructed with the null updater/memory path (not crash, not silently + pass a bare ``ZepGraphMemoryUpdater()`` that would error out). + """ + monkeypatch.setenv("LLM_STUB_MODE", "true") + monkeypatch.setenv("UPLOADS_DIR", str(tmp_path)) + from app.config import Config + Config.LLM_STUB_MODE = True + Config.UPLOADS_DIR = str(tmp_path) + + from app.services.simulation_manager import SimulationManager + sim_root = tmp_path / "simulations" + sim_root.mkdir(parents=True, exist_ok=True) + monkeypatch.setattr(SimulationManager, "SIMULATION_DATA_DIR", str(sim_root)) + + sim_id = "sim_no_state" + sim_dir = sim_root / sim_id + sim_dir.mkdir(parents=True) + (sim_dir / "reddit_profiles.json").write_text( + json.dumps([{"user_id": 0, "user_name": "u0", "name": "A0", + "persona": "p", "profession": "fisher"}]), + encoding="utf-8", + ) + + from app.api.interview import _build_orchestrator + + orch = _build_orchestrator(sim_id) + assert orch.zep_writer.graph_id == "" + # Null updater path: writer must still respond to _emit without raising. + orch.zep_writer._emit("hello") diff --git a/backend/tests/interviews/test_simulation_hooks.py b/backend/tests/interviews/test_simulation_hooks.py index cef304f2..52852d28 100644 --- a/backend/tests/interviews/test_simulation_hooks.py +++ b/backend/tests/interviews/test_simulation_hooks.py @@ -7,11 +7,27 @@ NOTE ON SHAPE DIVERGENCE vs. original plan spec: - The COMPLETED transition lives in simulation_runner.py (SimulationRunner._monitor_simulation), not in simulation_manager.py. The _notify_on_completed hook is registered on SimulationManager and the production insertion point for COMPLETED is documented in DONE_WITH_CONCERNS. + +Hooks are stored on the class (C3 fix), so each test snapshots/restores the +registries via the autouse fixture to keep test isolation. """ +import pytest + from app.services.simulation_manager import SimulationManager, SimulationState, SimulationStatus +@pytest.fixture(autouse=True) +def _isolate_class_hooks(): + saved_ready = list(SimulationManager._on_ready_hooks) + saved_completed = list(SimulationManager._on_completed_hooks) + try: + yield + finally: + SimulationManager._on_ready_hooks[:] = saved_ready + SimulationManager._on_completed_hooks[:] = saved_completed + + def test_register_post_ready_hook_invoked(): called = [] mgr = SimulationManager() @@ -38,3 +54,43 @@ def test_register_post_completed_hook_invoked(): ) mgr._notify_on_completed(state) assert called == [("done", "abc")] + + +def test_hooks_survive_across_instances(): + """C3: hook registries are class-level, so callbacks registered through the + classmethod must still fire on a freshly constructed instance. This is + what makes the Flask per-request ``SimulationManager()`` pattern work + after ``install_hooks(SimulationManager)`` runs at app startup. + """ + called: list[str] = [] + + # Register via the class — the production install_hooks(cls) path. + SimulationManager.register_on_ready(lambda s: called.append(f"ready:{s.simulation_id}")) + SimulationManager.register_on_completed(lambda s: called.append(f"done:{s.simulation_id}")) + + # New, independently-constructed instance must still see the hooks. + fresh = SimulationManager() + state = SimulationState( + simulation_id="cross_instance", + project_id="p", + graph_id="g", + status=SimulationStatus.READY, + ) + fresh._notify_on_ready(state) + state.status = SimulationStatus.COMPLETED + fresh._notify_on_completed(state) + + assert "ready:cross_instance" in called + assert "done:cross_instance" in called + + +def test_register_via_instance_also_lands_on_class(): + """Registering through an instance must populate the class registry too — + backward-compatibility with code that calls ``manager.register_on_*``. + """ + mgr1 = SimulationManager() + mgr1.register_on_ready(lambda s: None) + # A second, unrelated instance must see the hook. + mgr2 = SimulationManager() + assert len(SimulationManager._on_ready_hooks) >= 1 + assert SimulationManager._on_ready_hooks is mgr2.__class__._on_ready_hooks diff --git a/backend/tests/interviews/test_zep_writer.py b/backend/tests/interviews/test_zep_writer.py index 661ef44b..6eaed454 100644 --- a/backend/tests/interviews/test_zep_writer.py +++ b/backend/tests/interviews/test_zep_writer.py @@ -1,16 +1,26 @@ +import pytest + from app.models.interview import ( LikertResponse, InterviewPhase, SubagentKind, ) from app.services.interviews.zep_writer import InterviewZepWriter + class _FakeMemoryUpdater: + """Fake mirroring the real ZepGraphMemoryUpdater contract. + + Post-C4 the writer only uses ``add_text_episode(graph_id, text)`` — + ``add_activity`` is deliberately omitted to lock in the new behaviour and + catch any regression that re-introduces the broken dict-based fallback. + """ + def __init__(self): - self.events = [] - def add_activity(self, activity): - self.events.append(activity) + self.events: list[dict] = [] + def add_text_episode(self, graph_id, text): self.events.append({"graph_id": graph_id, "text": text}) + def test_per_agent_episode_text(): upd = _FakeMemoryUpdater() w = InterviewZepWriter(memory_updater=upd, graph_id="g1") @@ -20,9 +30,48 @@ def test_per_agent_episode_text(): w.write_per_agent(SubagentKind.LONGITUDINAL, r, agent_name="Fischer Müller") assert any("Fischer Müller" in str(e) for e in upd.events) assert any("longitudinal/T1" in str(e) for e in upd.events) + # Each event must carry the configured graph_id. + assert all(e["graph_id"] == "g1" for e in upd.events) + def test_aggregate_episode(): upd = _FakeMemoryUpdater() w = InterviewZepWriter(memory_updater=upd, graph_id="g1") w.write_aggregate(SubagentKind.SCENARIO, summary="S1 mean desirability 5.2; S2 mean 2.1") assert any("S1 mean" in str(e) for e in upd.events) + + +def test_emit_uses_add_text_episode_with_graph_id(): + """C4: ``_emit`` must call ``updater.add_text_episode(graph_id, text)`` + with the constructor's graph_id and the raw text — no dict shape, no + ``add_activity`` fallback (the real ``add_activity`` rejects dicts). + """ + upd = _FakeMemoryUpdater() + w = InterviewZepWriter(memory_updater=upd, graph_id="g_xyz") + w._emit("hello world") + assert upd.events == [{"graph_id": "g_xyz", "text": "hello world"}] + + +def test_emit_raises_when_updater_lacks_add_text_episode(): + """C4: a memory_updater without ``add_text_episode`` must surface a + RuntimeError rather than silently no-op via a broken ``add_activity`` + fallback. + """ + + class _Broken: + def add_activity(self, activity): # pragma: no cover - kept for clarity + raise AssertionError("must not be called") + + w = InterviewZepWriter(memory_updater=_Broken(), graph_id="g1") + with pytest.raises(RuntimeError, match="add_text_episode"): + w._emit("x") + + +def test_real_updater_exposes_add_text_episode(): + """C4 sanity check: ZepGraphMemoryUpdater (the real class) must expose + ``add_text_episode`` so the production wiring works without falling + through to the broken ``add_activity(dict)`` path. + """ + from app.services.zep_graph_memory_updater import ZepGraphMemoryUpdater + + assert hasattr(ZepGraphMemoryUpdater, "add_text_episode")