From 6b04ea5c271154abebb3434e8a9752d410e8f48c Mon Sep 17 00:00:00 2001 From: Christian Moellmann Date: Sat, 23 May 2026 12:51:13 +0200 Subject: [PATCH] =?UTF-8?q?feat(interviews):=20auto-trigger=20lifecycle=20?= =?UTF-8?q?hooks=20+=20bridge=20SimulationRunner=E2=86=92Manager=20on=20CO?= =?UTF-8?q?MPLETED?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add backend/app/services/interviews/lifecycle.py with install_hooks() that registers on_ready (pre-survey) and on_completed (post-survey + synthesis) daemon-thread callbacks on a SimulationManager. - Add SimulationRunner.register_on_completed() / _fire_on_completed() so external callbacks can be notified when _monitor_simulation transitions to COMPLETED (both exit-code-0 path and simulation_end event path). - Wire both in app/__init__.py: create singleton SimulationManager, install lifecycle hooks, and register its _notify_on_completed with SimulationRunner. - Add test_lifecycle.py: verifies install_hooks registers one callable for each of ready and completed. - All 40 unit tests + 2 integration tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/app/__init__.py | 13 ++++ backend/app/services/interviews/lifecycle.py | 72 ++++++++++++++++++++ backend/app/services/simulation_runner.py | 26 ++++++- backend/tests/interviews/test_lifecycle.py | 26 +++++++ 4 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 backend/app/services/interviews/lifecycle.py create mode 100644 backend/tests/interviews/test_lifecycle.py diff --git a/backend/app/__init__.py b/backend/app/__init__.py index c2a36fd2..d3a6d543 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -47,6 +47,19 @@ def create_app(config_class=Config): SimulationRunner.register_cleanup() if should_log_startup: logger.info("已注册模拟进程清理函数") + + # Install interview lifecycle hooks on a singleton SimulationManager. + # The singleton's _notify_on_completed is also wired into SimulationRunner + # so that the runner's monitor thread fires the completed hooks when a + # simulation process exits successfully. + from .services.simulation_manager import SimulationManager + from .services.interviews.lifecycle import install_hooks + + _simulation_manager_singleton = SimulationManager() + install_hooks(_simulation_manager_singleton) + SimulationRunner.register_on_completed(_simulation_manager_singleton._notify_on_completed) + if should_log_startup: + logger.info("已安装面试生命周期钩子") # 请求日志中间件 @app.before_request diff --git a/backend/app/services/interviews/lifecycle.py b/backend/app/services/interviews/lifecycle.py new file mode 100644 index 00000000..5e2d351d --- /dev/null +++ b/backend/app/services/interviews/lifecycle.py @@ -0,0 +1,72 @@ +""" +Interview lifecycle hook installer (Task 20). + +install_hooks(manager) registers two callbacks on a SimulationManager: + - on_ready → spawn T0 longitudinal pre-survey in a background thread + - on_completed → spawn full post-sim batch + synthesis in a background thread + +Both hooks are best-effort: failures are logged but never propagate to the +calling thread. +""" + +from __future__ import annotations + +import threading + +from app.utils.logger import get_logger + +logger = get_logger(__name__) + + +def install_hooks(manager) -> None: + """Attach interview lifecycle callbacks to a SimulationManager. + + on_ready → spawn T0 longitudinal in a background thread + on_completed → spawn full post-sim batch in a background thread + Hooks are best-effort; failures only log. + """ + + def _on_ready(state) -> None: + sim_id = ( + getattr(state, "simulation_id", None) + or getattr(state, "sim_id", None) + or getattr(state, "id", None) + ) + if not sim_id: + return + threading.Thread(target=_run_pre, args=(sim_id,), daemon=True).start() + + def _on_completed(state) -> None: + sim_id = ( + getattr(state, "simulation_id", None) + or getattr(state, "sim_id", None) + or getattr(state, "id", None) + ) + if not sim_id: + return + threading.Thread(target=_run_post, args=(sim_id,), daemon=True).start() + + manager.register_on_ready(_on_ready) + manager.register_on_completed(_on_completed) + + +def _run_pre(sim_id: str) -> None: + try: + from app.api.interview import _build_orchestrator + + orch = _build_orchestrator(sim_id) + orch.run_pre() + except Exception as e: + logger.warning(f"auto pre-survey failed for {sim_id}: {e!r}") + + +def _run_post(sim_id: str) -> None: + try: + from app.api.interview import _build_orchestrator + from app.services.interview_synthesizer import InterviewSynthesizer + + orch = _build_orchestrator(sim_id) + orch.run_post() + InterviewSynthesizer(store=orch.store).run() + except Exception as e: + logger.warning(f"auto post-survey failed for {sim_id}: {e!r}") diff --git a/backend/app/services/simulation_runner.py b/backend/app/services/simulation_runner.py index e86021f8..942f522f 100644 --- a/backend/app/services/simulation_runner.py +++ b/backend/app/services/simulation_runner.py @@ -226,7 +226,29 @@ class SimulationRunner: # 图谱记忆更新配置 _graph_memory_enabled: Dict[str, bool] = {} # simulation_id -> enabled - + + # Completion callbacks registered from outside (e.g. SimulationManager lifecycle hooks). + # Each callable receives the SimulationRunState that just transitioned to COMPLETED. + _on_completed_callbacks: list = [] + + @classmethod + def register_on_completed(cls, fn) -> None: + """Register a callback invoked when a simulation transitions to COMPLETED. + + The callback receives the SimulationRunState instance. It is called from + the monitor daemon thread, so keep it short or hand off to another thread. + """ + cls._on_completed_callbacks.append(fn) + + @classmethod + def _fire_on_completed(cls, state: SimulationRunState) -> None: + """Invoke all registered on_completed callbacks; exceptions are isolated.""" + for fn in list(cls._on_completed_callbacks): + try: + fn(state) + except Exception as e: + logger.warning(f"on_completed callback failed: {e!r}") + @classmethod def get_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]: """获取运行状态""" @@ -528,6 +550,7 @@ class SimulationRunner: state.runner_status = RunnerStatus.COMPLETED state.completed_at = datetime.now().isoformat() logger.info(f"模拟完成: {simulation_id}") + cls._fire_on_completed(state) else: state.runner_status = RunnerStatus.FAILED # 从主日志文件读取错误信息 @@ -638,6 +661,7 @@ class SimulationRunner: state.runner_status = RunnerStatus.COMPLETED state.completed_at = datetime.now().isoformat() logger.info(f"所有平台模拟已完成: {state.simulation_id}") + cls._fire_on_completed(state) # 更新轮次信息(从 round_end 事件) elif event_type == "round_end": diff --git a/backend/tests/interviews/test_lifecycle.py b/backend/tests/interviews/test_lifecycle.py new file mode 100644 index 00000000..f8d2c952 --- /dev/null +++ b/backend/tests/interviews/test_lifecycle.py @@ -0,0 +1,26 @@ +""" +Tests for interview lifecycle hook installer (Task 20). +""" + +from app.services.interviews.lifecycle import install_hooks + + +class _StubMgr: + def __init__(self): + self.ready = [] + self.completed = [] + + def register_on_ready(self, fn): + self.ready.append(fn) + + def register_on_completed(self, fn): + self.completed.append(fn) + + +def test_install_hooks_registers_two_callables(): + mgr = _StubMgr() + install_hooks(mgr) + assert len(mgr.ready) == 1 + assert len(mgr.completed) == 1 + assert callable(mgr.ready[0]) + assert callable(mgr.completed[0])