feat(interviews): auto-trigger lifecycle hooks + bridge SimulationRunner→Manager on COMPLETED

- Add backend/app/services/interviews/lifecycle.py with install_hooks() that
  registers on_ready (pre-survey) and on_completed (post-survey + synthesis)
  daemon-thread callbacks on a SimulationManager.
- Add SimulationRunner.register_on_completed() / _fire_on_completed() so
  external callbacks can be notified when _monitor_simulation transitions to
  COMPLETED (both exit-code-0 path and simulation_end event path).
- Wire both in app/__init__.py: create singleton SimulationManager, install
  lifecycle hooks, and register its _notify_on_completed with SimulationRunner.
- Add test_lifecycle.py: verifies install_hooks registers one callable for each
  of ready and completed.
- All 40 unit tests + 2 integration tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Christian Moellmann 2026-05-23 12:51:13 +02:00
parent acaa06170e
commit 6b04ea5c27
4 changed files with 136 additions and 1 deletions

View File

@ -48,6 +48,19 @@ def create_app(config_class=Config):
if should_log_startup: if should_log_startup:
logger.info("已注册模拟进程清理函数") logger.info("已注册模拟进程清理函数")
# Install interview lifecycle hooks on a singleton SimulationManager.
# The singleton's _notify_on_completed is also wired into SimulationRunner
# so that the runner's monitor thread fires the completed hooks when a
# simulation process exits successfully.
from .services.simulation_manager import SimulationManager
from .services.interviews.lifecycle import install_hooks
_simulation_manager_singleton = SimulationManager()
install_hooks(_simulation_manager_singleton)
SimulationRunner.register_on_completed(_simulation_manager_singleton._notify_on_completed)
if should_log_startup:
logger.info("已安装面试生命周期钩子")
# 请求日志中间件 # 请求日志中间件
@app.before_request @app.before_request
def log_request(): def log_request():

View File

@ -0,0 +1,72 @@
"""
Interview lifecycle hook installer (Task 20).
install_hooks(manager) registers two callbacks on a SimulationManager:
- on_ready spawn T0 longitudinal pre-survey in a background thread
- on_completed spawn full post-sim batch + synthesis in a background thread
Both hooks are best-effort: failures are logged but never propagate to the
calling thread.
"""
from __future__ import annotations
import threading
from app.utils.logger import get_logger
logger = get_logger(__name__)
def install_hooks(manager) -> None:
"""Attach interview lifecycle callbacks to a SimulationManager.
on_ready spawn T0 longitudinal in a background thread
on_completed spawn full post-sim batch in a background thread
Hooks are best-effort; failures only log.
"""
def _on_ready(state) -> None:
sim_id = (
getattr(state, "simulation_id", None)
or getattr(state, "sim_id", None)
or getattr(state, "id", None)
)
if not sim_id:
return
threading.Thread(target=_run_pre, args=(sim_id,), daemon=True).start()
def _on_completed(state) -> None:
sim_id = (
getattr(state, "simulation_id", None)
or getattr(state, "sim_id", None)
or getattr(state, "id", None)
)
if not sim_id:
return
threading.Thread(target=_run_post, args=(sim_id,), daemon=True).start()
manager.register_on_ready(_on_ready)
manager.register_on_completed(_on_completed)
def _run_pre(sim_id: str) -> None:
try:
from app.api.interview import _build_orchestrator
orch = _build_orchestrator(sim_id)
orch.run_pre()
except Exception as e:
logger.warning(f"auto pre-survey failed for {sim_id}: {e!r}")
def _run_post(sim_id: str) -> None:
try:
from app.api.interview import _build_orchestrator
from app.services.interview_synthesizer import InterviewSynthesizer
orch = _build_orchestrator(sim_id)
orch.run_post()
InterviewSynthesizer(store=orch.store).run()
except Exception as e:
logger.warning(f"auto post-survey failed for {sim_id}: {e!r}")

View File

@ -227,6 +227,28 @@ class SimulationRunner:
# 图谱记忆更新配置 # 图谱记忆更新配置
_graph_memory_enabled: Dict[str, bool] = {} # simulation_id -> enabled _graph_memory_enabled: Dict[str, bool] = {} # simulation_id -> enabled
# Completion callbacks registered from outside (e.g. SimulationManager lifecycle hooks).
# Each callable receives the SimulationRunState that just transitioned to COMPLETED.
_on_completed_callbacks: list = []
@classmethod
def register_on_completed(cls, fn) -> None:
"""Register a callback invoked when a simulation transitions to COMPLETED.
The callback receives the SimulationRunState instance. It is called from
the monitor daemon thread, so keep it short or hand off to another thread.
"""
cls._on_completed_callbacks.append(fn)
@classmethod
def _fire_on_completed(cls, state: SimulationRunState) -> None:
"""Invoke all registered on_completed callbacks; exceptions are isolated."""
for fn in list(cls._on_completed_callbacks):
try:
fn(state)
except Exception as e:
logger.warning(f"on_completed callback failed: {e!r}")
@classmethod @classmethod
def get_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]: def get_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]:
"""获取运行状态""" """获取运行状态"""
@ -528,6 +550,7 @@ class SimulationRunner:
state.runner_status = RunnerStatus.COMPLETED state.runner_status = RunnerStatus.COMPLETED
state.completed_at = datetime.now().isoformat() state.completed_at = datetime.now().isoformat()
logger.info(f"模拟完成: {simulation_id}") logger.info(f"模拟完成: {simulation_id}")
cls._fire_on_completed(state)
else: else:
state.runner_status = RunnerStatus.FAILED state.runner_status = RunnerStatus.FAILED
# 从主日志文件读取错误信息 # 从主日志文件读取错误信息
@ -638,6 +661,7 @@ class SimulationRunner:
state.runner_status = RunnerStatus.COMPLETED state.runner_status = RunnerStatus.COMPLETED
state.completed_at = datetime.now().isoformat() state.completed_at = datetime.now().isoformat()
logger.info(f"所有平台模拟已完成: {state.simulation_id}") logger.info(f"所有平台模拟已完成: {state.simulation_id}")
cls._fire_on_completed(state)
# 更新轮次信息(从 round_end 事件) # 更新轮次信息(从 round_end 事件)
elif event_type == "round_end": elif event_type == "round_end":

View File

@ -0,0 +1,26 @@
"""
Tests for interview lifecycle hook installer (Task 20).
"""
from app.services.interviews.lifecycle import install_hooks
class _StubMgr:
def __init__(self):
self.ready = []
self.completed = []
def register_on_ready(self, fn):
self.ready.append(fn)
def register_on_completed(self, fn):
self.completed.append(fn)
def test_install_hooks_registers_two_callables():
mgr = _StubMgr()
install_hooks(mgr)
assert len(mgr.ready) == 1
assert len(mgr.completed) == 1
assert callable(mgr.ready[0])
assert callable(mgr.completed[0])