feat(interviews): on_ready / on_completed hook registry on SimulationManager

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Christian Moellmann 2026-05-23 12:29:30 +02:00
parent b3e2039817
commit 3322bcb20c
2 changed files with 77 additions and 2 deletions

View File

@ -132,9 +132,13 @@ class SimulationManager:
def __init__(self): def __init__(self):
# 确保目录存在 # 确保目录存在
os.makedirs(self.SIMULATION_DATA_DIR, exist_ok=True) os.makedirs(self.SIMULATION_DATA_DIR, exist_ok=True)
# 内存中的模拟状态缓存 # 内存中的模拟状态缓存
self._simulations: Dict[str, SimulationState] = {} self._simulations: Dict[str, SimulationState] = {}
# Lifecycle hook registries
self._on_ready_hooks: list = []
self._on_completed_hooks: list = []
def _get_simulation_dir(self, simulation_id: str) -> str: def _get_simulation_dir(self, simulation_id: str) -> str:
"""获取模拟数据目录""" """获取模拟数据目录"""
@ -191,6 +195,36 @@ class SimulationManager:
self._simulations[simulation_id] = state self._simulations[simulation_id] = state
return state return state
# ------------------------------------------------------------------
# Lifecycle hook registration
# ------------------------------------------------------------------
def register_on_ready(self, fn) -> None:
"""Register a callback invoked when a simulation transitions to READY."""
self._on_ready_hooks.append(fn)
def register_on_completed(self, fn) -> None:
"""Register a callback invoked when a simulation transitions to COMPLETED."""
self._on_completed_hooks.append(fn)
def _notify_on_ready(self, state: "SimulationState") -> None:
"""Invoke all on_ready hooks; exceptions are isolated per hook."""
for fn in list(self._on_ready_hooks):
try:
fn(state)
except Exception as e:
logger.warning(f"on_ready hook failed: {e!r}")
def _notify_on_completed(self, state: "SimulationState") -> None:
"""Invoke all on_completed hooks; exceptions are isolated per hook."""
for fn in list(self._on_completed_hooks):
try:
fn(state)
except Exception as e:
logger.warning(f"on_completed hook failed: {e!r}")
# ------------------------------------------------------------------
def create_simulation( def create_simulation(
self, self,
project_id: str, project_id: str,
@ -441,7 +475,8 @@ class SimulationManager:
# 更新状态 # 更新状态
state.status = SimulationStatus.READY state.status = SimulationStatus.READY
self._save_simulation_state(state) self._save_simulation_state(state)
self._notify_on_ready(state)
logger.info(f"模拟准备完成: {simulation_id}, " logger.info(f"模拟准备完成: {simulation_id}, "
f"entities={state.entities_count}, profiles={state.profiles_count}") f"entities={state.entities_count}, profiles={state.profiles_count}")

View File

@ -0,0 +1,40 @@
"""
Tests for SimulationManager lifecycle hooks (on_ready / on_completed).
NOTE ON SHAPE DIVERGENCE vs. original plan spec:
- SimulationState uses `simulation_id` (not `sim_id`)
- `status` is a SimulationStatus enum, not a plain string
- The COMPLETED transition lives in simulation_runner.py (SimulationRunner._monitor_simulation),
not in simulation_manager.py. The _notify_on_completed hook is registered on SimulationManager
and the production insertion point for COMPLETED is documented in DONE_WITH_CONCERNS.
"""
from app.services.simulation_manager import SimulationManager, SimulationState, SimulationStatus
def test_register_post_ready_hook_invoked():
called = []
mgr = SimulationManager()
mgr.register_on_ready(lambda state: called.append(("ready", state.simulation_id)))
state = SimulationState(
simulation_id="abc",
project_id="proj1",
graph_id="graph1",
status=SimulationStatus.READY,
)
mgr._notify_on_ready(state)
assert called == [("ready", "abc")]
def test_register_post_completed_hook_invoked():
called = []
mgr = SimulationManager()
mgr.register_on_completed(lambda state: called.append(("done", state.simulation_id)))
state = SimulationState(
simulation_id="abc",
project_id="proj1",
graph_id="graph1",
status=SimulationStatus.COMPLETED,
)
mgr._notify_on_completed(state)
assert called == [("done", "abc")]