diff --git a/backend/app/__init__.py b/backend/app/__init__.py index c2a36fd2..d3a6d543 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -47,6 +47,19 @@ def create_app(config_class=Config): SimulationRunner.register_cleanup() if should_log_startup: logger.info("已注册模拟进程清理函数") + + # Install interview lifecycle hooks on a singleton SimulationManager. + # The singleton's _notify_on_completed is also wired into SimulationRunner + # so that the runner's monitor thread fires the completed hooks when a + # simulation process exits successfully. + from .services.simulation_manager import SimulationManager + from .services.interviews.lifecycle import install_hooks + + _simulation_manager_singleton = SimulationManager() + install_hooks(_simulation_manager_singleton) + SimulationRunner.register_on_completed(_simulation_manager_singleton._notify_on_completed) + if should_log_startup: + logger.info("已安装面试生命周期钩子") # 请求日志中间件 @app.before_request diff --git a/backend/app/services/interviews/lifecycle.py b/backend/app/services/interviews/lifecycle.py new file mode 100644 index 00000000..5e2d351d --- /dev/null +++ b/backend/app/services/interviews/lifecycle.py @@ -0,0 +1,72 @@ +""" +Interview lifecycle hook installer (Task 20). + +install_hooks(manager) registers two callbacks on a SimulationManager: + - on_ready → spawn T0 longitudinal pre-survey in a background thread + - on_completed → spawn full post-sim batch + synthesis in a background thread + +Both hooks are best-effort: failures are logged but never propagate to the +calling thread. +""" + +from __future__ import annotations + +import threading + +from app.utils.logger import get_logger + +logger = get_logger(__name__) + + +def install_hooks(manager) -> None: + """Attach interview lifecycle callbacks to a SimulationManager. + + on_ready → spawn T0 longitudinal in a background thread + on_completed → spawn full post-sim batch in a background thread + Hooks are best-effort; failures only log. + """ + + def _on_ready(state) -> None: + sim_id = ( + getattr(state, "simulation_id", None) + or getattr(state, "sim_id", None) + or getattr(state, "id", None) + ) + if not sim_id: + return + threading.Thread(target=_run_pre, args=(sim_id,), daemon=True).start() + + def _on_completed(state) -> None: + sim_id = ( + getattr(state, "simulation_id", None) + or getattr(state, "sim_id", None) + or getattr(state, "id", None) + ) + if not sim_id: + return + threading.Thread(target=_run_post, args=(sim_id,), daemon=True).start() + + manager.register_on_ready(_on_ready) + manager.register_on_completed(_on_completed) + + +def _run_pre(sim_id: str) -> None: + try: + from app.api.interview import _build_orchestrator + + orch = _build_orchestrator(sim_id) + orch.run_pre() + except Exception as e: + logger.warning(f"auto pre-survey failed for {sim_id}: {e!r}") + + +def _run_post(sim_id: str) -> None: + try: + from app.api.interview import _build_orchestrator + from app.services.interview_synthesizer import InterviewSynthesizer + + orch = _build_orchestrator(sim_id) + orch.run_post() + InterviewSynthesizer(store=orch.store).run() + except Exception as e: + logger.warning(f"auto post-survey failed for {sim_id}: {e!r}") diff --git a/backend/app/services/simulation_runner.py b/backend/app/services/simulation_runner.py index e86021f8..942f522f 100644 --- a/backend/app/services/simulation_runner.py +++ b/backend/app/services/simulation_runner.py @@ -226,7 +226,29 @@ class SimulationRunner: # 图谱记忆更新配置 _graph_memory_enabled: Dict[str, bool] = {} # simulation_id -> enabled - + + # Completion callbacks registered from outside (e.g. SimulationManager lifecycle hooks). + # Each callable receives the SimulationRunState that just transitioned to COMPLETED. + _on_completed_callbacks: list = [] + + @classmethod + def register_on_completed(cls, fn) -> None: + """Register a callback invoked when a simulation transitions to COMPLETED. + + The callback receives the SimulationRunState instance. It is called from + the monitor daemon thread, so keep it short or hand off to another thread. + """ + cls._on_completed_callbacks.append(fn) + + @classmethod + def _fire_on_completed(cls, state: SimulationRunState) -> None: + """Invoke all registered on_completed callbacks; exceptions are isolated.""" + for fn in list(cls._on_completed_callbacks): + try: + fn(state) + except Exception as e: + logger.warning(f"on_completed callback failed: {e!r}") + @classmethod def get_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]: """获取运行状态""" @@ -528,6 +550,7 @@ class SimulationRunner: state.runner_status = RunnerStatus.COMPLETED state.completed_at = datetime.now().isoformat() logger.info(f"模拟完成: {simulation_id}") + cls._fire_on_completed(state) else: state.runner_status = RunnerStatus.FAILED # 从主日志文件读取错误信息 @@ -638,6 +661,7 @@ class SimulationRunner: state.runner_status = RunnerStatus.COMPLETED state.completed_at = datetime.now().isoformat() logger.info(f"所有平台模拟已完成: {state.simulation_id}") + cls._fire_on_completed(state) # 更新轮次信息(从 round_end 事件) elif event_type == "round_end": diff --git a/backend/tests/interviews/test_lifecycle.py b/backend/tests/interviews/test_lifecycle.py new file mode 100644 index 00000000..f8d2c952 --- /dev/null +++ b/backend/tests/interviews/test_lifecycle.py @@ -0,0 +1,26 @@ +""" +Tests for interview lifecycle hook installer (Task 20). +""" + +from app.services.interviews.lifecycle import install_hooks + + +class _StubMgr: + def __init__(self): + self.ready = [] + self.completed = [] + + def register_on_ready(self, fn): + self.ready.append(fn) + + def register_on_completed(self, fn): + self.completed.append(fn) + + +def test_install_hooks_registers_two_callables(): + mgr = _StubMgr() + install_hooks(mgr) + assert len(mgr.ready) == 1 + assert len(mgr.completed) == 1 + assert callable(mgr.ready[0]) + assert callable(mgr.completed[0])