""" OASIS simulation runner Runs simulations in the background, records each agent's actions, and supports real-time status monitoring """ import os import sys import json import time import asyncio import threading import subprocess import signal import atexit from typing import Dict, Any, List, Optional, Union from dataclasses import dataclass, field from datetime import datetime from enum import Enum from queue import Queue from ..config import Config from ..utils.logger import get_logger from ..utils.locale import get_locale, set_locale from .zep_graph_memory_updater import ZepGraphMemoryManager from .simulation_ipc import SimulationIPCClient, CommandType, IPCResponse logger = get_logger('mirofish.simulation_runner') # Flag indicating whether the cleanup function has been registered _cleanup_registered = False # Platform detection IS_WINDOWS = sys.platform == 'win32' class RunnerStatus(str, Enum): """Runner status""" IDLE = "idle" STARTING = "starting" RUNNING = "running" PAUSED = "paused" STOPPING = "stopping" STOPPED = "stopped" COMPLETED = "completed" FAILED = "failed" @dataclass class AgentAction: """Agent action record""" round_num: int timestamp: str platform: str # twitter / reddit agent_id: int agent_name: str action_type: str # CREATE_POST, LIKE_POST, etc. action_args: Dict[str, Any] = field(default_factory=dict) result: Optional[str] = None success: bool = True def to_dict(self) -> Dict[str, Any]: return { "round_num": self.round_num, "timestamp": self.timestamp, "platform": self.platform, "agent_id": self.agent_id, "agent_name": self.agent_name, "action_type": self.action_type, "action_args": self.action_args, "result": self.result, "success": self.success, } @dataclass class RoundSummary: """Per-round summary""" round_num: int start_time: str end_time: Optional[str] = None simulated_hour: int = 0 twitter_actions: int = 0 reddit_actions: int = 0 active_agents: List[int] = field(default_factory=list) actions: List[AgentAction] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return { "round_num": self.round_num, "start_time": self.start_time, "end_time": self.end_time, "simulated_hour": self.simulated_hour, "twitter_actions": self.twitter_actions, "reddit_actions": self.reddit_actions, "active_agents": self.active_agents, "actions_count": len(self.actions), "actions": [a.to_dict() for a in self.actions], } @dataclass class SimulationRunState: """Simulation run state (real-time)""" simulation_id: str runner_status: RunnerStatus = RunnerStatus.IDLE # Progress info current_round: int = 0 total_rounds: int = 0 simulated_hours: int = 0 total_simulation_hours: int = 0 # Per-platform independent rounds and simulated time (for parallel dual-platform display) twitter_current_round: int = 0 reddit_current_round: int = 0 twitter_simulated_hours: int = 0 reddit_simulated_hours: int = 0 # Platform status twitter_running: bool = False reddit_running: bool = False twitter_actions_count: int = 0 reddit_actions_count: int = 0 # Platform completion status (detected via simulation_end events in actions.jsonl) twitter_completed: bool = False reddit_completed: bool = False # Per-round summaries rounds: List[RoundSummary] = field(default_factory=list) # Recent actions (for real-time frontend display) recent_actions: List[AgentAction] = field(default_factory=list) max_recent_actions: int = 50 # Timestamps started_at: Optional[str] = None updated_at: str = field(default_factory=lambda: datetime.now().isoformat()) completed_at: Optional[str] = None # Error info error: Optional[str] = None # Process ID (for stopping) process_pid: Optional[int] = None def add_action(self, action: AgentAction): """Add an action to the recent actions list""" self.recent_actions.insert(0, action) if len(self.recent_actions) > self.max_recent_actions: self.recent_actions = self.recent_actions[:self.max_recent_actions] if action.platform == "twitter": self.twitter_actions_count += 1 else: self.reddit_actions_count += 1 self.updated_at = datetime.now().isoformat() def to_dict(self) -> Dict[str, Any]: return { "simulation_id": self.simulation_id, "runner_status": self.runner_status.value, "current_round": self.current_round, "total_rounds": self.total_rounds, "simulated_hours": self.simulated_hours, "total_simulation_hours": self.total_simulation_hours, "progress_percent": round(self.current_round / max(self.total_rounds, 1) * 100, 1), # Per-platform independent rounds and time "twitter_current_round": self.twitter_current_round, "reddit_current_round": self.reddit_current_round, "twitter_simulated_hours": self.twitter_simulated_hours, "reddit_simulated_hours": self.reddit_simulated_hours, "twitter_running": self.twitter_running, "reddit_running": self.reddit_running, "twitter_completed": self.twitter_completed, "reddit_completed": self.reddit_completed, "twitter_actions_count": self.twitter_actions_count, "reddit_actions_count": self.reddit_actions_count, "total_actions_count": self.twitter_actions_count + self.reddit_actions_count, "started_at": self.started_at, "updated_at": self.updated_at, "completed_at": self.completed_at, "error": self.error, "process_pid": self.process_pid, } def to_detail_dict(self) -> Dict[str, Any]: """Detailed info including recent actions""" result = self.to_dict() result["recent_actions"] = [a.to_dict() for a in self.recent_actions] result["rounds_count"] = len(self.rounds) return result class SimulationRunner: """ Simulation runner Responsibilities: 1. Run OASIS simulation in a background process 2. Parse run logs and record each agent's actions 3. Provide a real-time status query interface 4. Support pause/stop/resume operations """ # Run state storage directory RUN_STATE_DIR = os.path.join( os.path.dirname(__file__), '../../uploads/simulations' ) # Scripts directory SCRIPTS_DIR = os.path.join( os.path.dirname(__file__), '../../scripts' ) # In-memory run states _run_states: Dict[str, SimulationRunState] = {} _processes: Dict[str, subprocess.Popen] = {} _action_queues: Dict[str, Queue] = {} _monitor_threads: Dict[str, threading.Thread] = {} _stdout_files: Dict[str, Any] = {} # stdout file handles _stderr_files: Dict[str, Any] = {} # stderr file handles # Graph memory update configuration _graph_memory_enabled: Dict[str, bool] = {} # simulation_id -> enabled @classmethod def get_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]: """Get run state""" if simulation_id in cls._run_states: return cls._run_states[simulation_id] # Try to load from file state = cls._load_run_state(simulation_id) if state: cls._run_states[simulation_id] = state return state @classmethod def _load_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]: """Load run state from file""" state_file = os.path.join(cls.RUN_STATE_DIR, simulation_id, "run_state.json") if not os.path.exists(state_file): return None try: with open(state_file, 'r', encoding='utf-8') as f: data = json.load(f) state = SimulationRunState( simulation_id=simulation_id, runner_status=RunnerStatus(data.get("runner_status", "idle")), current_round=data.get("current_round", 0), total_rounds=data.get("total_rounds", 0), simulated_hours=data.get("simulated_hours", 0), total_simulation_hours=data.get("total_simulation_hours", 0), # Per-platform independent rounds and time twitter_current_round=data.get("twitter_current_round", 0), reddit_current_round=data.get("reddit_current_round", 0), twitter_simulated_hours=data.get("twitter_simulated_hours", 0), reddit_simulated_hours=data.get("reddit_simulated_hours", 0), twitter_running=data.get("twitter_running", False), reddit_running=data.get("reddit_running", False), twitter_completed=data.get("twitter_completed", False), reddit_completed=data.get("reddit_completed", False), twitter_actions_count=data.get("twitter_actions_count", 0), reddit_actions_count=data.get("reddit_actions_count", 0), started_at=data.get("started_at"), updated_at=data.get("updated_at", datetime.now().isoformat()), completed_at=data.get("completed_at"), error=data.get("error"), process_pid=data.get("process_pid"), ) # Load recent actions actions_data = data.get("recent_actions", []) for a in actions_data: state.recent_actions.append(AgentAction( round_num=a.get("round_num", 0), timestamp=a.get("timestamp", ""), platform=a.get("platform", ""), agent_id=a.get("agent_id", 0), agent_name=a.get("agent_name", ""), action_type=a.get("action_type", ""), action_args=a.get("action_args", {}), result=a.get("result"), success=a.get("success", True), )) return state except Exception as e: logger.error(f"Failed to load run state: {str(e)}") return None @classmethod def _save_run_state(cls, state: SimulationRunState): """Save run state to file""" sim_dir = os.path.join(cls.RUN_STATE_DIR, state.simulation_id) os.makedirs(sim_dir, exist_ok=True) state_file = os.path.join(sim_dir, "run_state.json") data = state.to_detail_dict() with open(state_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) cls._run_states[state.simulation_id] = state @classmethod def start_simulation( cls, simulation_id: str, platform: str = "parallel", # twitter / reddit / parallel max_rounds: int = None, # Maximum simulation rounds (optional, to cap long simulations) enable_graph_memory_update: bool = False, # Whether to update activities to the Zep graph graph_id: str = None # Zep graph ID (required when graph update is enabled) ) -> SimulationRunState: """ Start a simulation Args: simulation_id: simulation ID platform: run platform (twitter/reddit/parallel) max_rounds: maximum simulation rounds (optional, to cap long simulations) enable_graph_memory_update: whether to dynamically update agent activities to the Zep graph graph_id: Zep graph ID (required when graph update is enabled) Returns: SimulationRunState """ # Check if already running existing = cls.get_run_state(simulation_id) if existing and existing.runner_status in [RunnerStatus.RUNNING, RunnerStatus.STARTING]: raise ValueError(f"Simulation is already running: {simulation_id}") # Load simulation config sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) config_path = os.path.join(sim_dir, "simulation_config.json") if not os.path.exists(config_path): raise ValueError(f"Simulation config not found; please call /prepare first") with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) # Initialize run state time_config = config.get("time_config", {}) total_hours = time_config.get("total_simulation_hours", 72) minutes_per_round = time_config.get("minutes_per_round", 30) total_rounds = int(total_hours * 60 / minutes_per_round) # Truncate if max_rounds is specified if max_rounds is not None and max_rounds > 0: original_rounds = total_rounds total_rounds = min(total_rounds, max_rounds) if total_rounds < original_rounds: logger.info(f"Rounds truncated: {original_rounds} -> {total_rounds} (max_rounds={max_rounds})") state = SimulationRunState( simulation_id=simulation_id, runner_status=RunnerStatus.STARTING, total_rounds=total_rounds, total_simulation_hours=total_hours, started_at=datetime.now().isoformat(), ) cls._save_run_state(state) # Create graph memory updater if enabled if enable_graph_memory_update: if not graph_id: raise ValueError("graph_id is required when graph memory update is enabled") try: ZepGraphMemoryManager.create_updater(simulation_id, graph_id) cls._graph_memory_enabled[simulation_id] = True logger.info(f"Graph memory update enabled: simulation_id={simulation_id}, graph_id={graph_id}") except Exception as e: logger.error(f"Failed to create graph memory updater: {e}") cls._graph_memory_enabled[simulation_id] = False else: cls._graph_memory_enabled[simulation_id] = False # Determine which script to run (scripts are in the backend/scripts/ directory) if platform == "twitter": script_name = "run_twitter_simulation.py" state.twitter_running = True elif platform == "reddit": script_name = "run_reddit_simulation.py" state.reddit_running = True else: script_name = "run_parallel_simulation.py" state.twitter_running = True state.reddit_running = True script_path = os.path.join(cls.SCRIPTS_DIR, script_name) if not os.path.exists(script_path): raise ValueError(f"Script not found: {script_path}") # Create action queue action_queue = Queue() cls._action_queues[simulation_id] = action_queue # Start simulation process try: # Build run command with full paths. # New log structure: # twitter/actions.jsonl - Twitter action log # reddit/actions.jsonl - Reddit action log # simulation.log - main process log cmd = [ sys.executable, # Python interpreter script_path, "--config", config_path, # use full config file path ] # Append max_rounds to command line if specified if max_rounds is not None and max_rounds > 0: cmd.extend(["--max-rounds", str(max_rounds)]) # Create main log file to avoid stdout/stderr pipe buffer filling up and blocking the process main_log_path = os.path.join(sim_dir, "simulation.log") main_log_file = open(main_log_path, 'w', encoding='utf-8') # Set subprocess environment variables to ensure UTF-8 encoding on Windows. # This fixes issues where third-party libraries (e.g. OASIS) open files without specifying an encoding. env = os.environ.copy() env['PYTHONUTF8'] = '1' # Python 3.7+: makes all open() calls default to UTF-8 env['PYTHONIOENCODING'] = 'utf-8' # Ensures stdout/stderr use UTF-8 # Set working directory to the simulation directory (databases and other files are created there). # Use start_new_session=True to create a new process group so all child processes can be # terminated via os.killpg when needed. process = subprocess.Popen( cmd, cwd=sim_dir, stdout=main_log_file, stderr=subprocess.STDOUT, # stderr also written to the same file text=True, encoding='utf-8', # Explicit encoding bufsize=1, env=env, # Pass environment with UTF-8 settings start_new_session=True, # Create new process group so server shutdown can terminate all related processes ) # Save file handles for later cleanup cls._stdout_files[simulation_id] = main_log_file cls._stderr_files[simulation_id] = None # No separate stderr file needed state.process_pid = process.pid state.runner_status = RunnerStatus.RUNNING cls._processes[simulation_id] = process cls._save_run_state(state) # Capture locale before spawning monitor thread current_locale = get_locale() # Start monitor thread monitor_thread = threading.Thread( target=cls._monitor_simulation, args=(simulation_id, current_locale), daemon=True ) monitor_thread.start() cls._monitor_threads[simulation_id] = monitor_thread logger.info(f"Simulation started: {simulation_id}, pid={process.pid}, platform={platform}") except Exception as e: state.runner_status = RunnerStatus.FAILED state.error = str(e) cls._save_run_state(state) raise return state @classmethod def _monitor_simulation(cls, simulation_id: str, locale: str = 'zh'): """Monitor the simulation process and parse action logs""" set_locale(locale) sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) # New log structure: per-platform action logs twitter_actions_log = os.path.join(sim_dir, "twitter", "actions.jsonl") reddit_actions_log = os.path.join(sim_dir, "reddit", "actions.jsonl") process = cls._processes.get(simulation_id) state = cls.get_run_state(simulation_id) if not process or not state: return twitter_position = 0 reddit_position = 0 try: while process.poll() is None: # Process is still running # Read Twitter action log if os.path.exists(twitter_actions_log): twitter_position = cls._read_action_log( twitter_actions_log, twitter_position, state, "twitter" ) # Read Reddit action log if os.path.exists(reddit_actions_log): reddit_position = cls._read_action_log( reddit_actions_log, reddit_position, state, "reddit" ) # Save state cls._save_run_state(state) time.sleep(2) # After process exits, do a final log read if os.path.exists(twitter_actions_log): cls._read_action_log(twitter_actions_log, twitter_position, state, "twitter") if os.path.exists(reddit_actions_log): cls._read_action_log(reddit_actions_log, reddit_position, state, "reddit") # Process has exited exit_code = process.returncode if exit_code == 0: state.runner_status = RunnerStatus.COMPLETED state.completed_at = datetime.now().isoformat() logger.info(f"Simulation completed: {simulation_id}") else: state.runner_status = RunnerStatus.FAILED # Read error info from main log file main_log_path = os.path.join(sim_dir, "simulation.log") error_info = "" try: if os.path.exists(main_log_path): with open(main_log_path, 'r', encoding='utf-8') as f: error_info = f.read()[-2000:] # Last 2000 characters except Exception: pass state.error = f"Process exit code: {exit_code}, error: {error_info}" logger.error(f"Simulation failed: {simulation_id}, error={state.error}") state.twitter_running = False state.reddit_running = False cls._save_run_state(state) except Exception as e: logger.error(f"Monitor thread exception: {simulation_id}, error={str(e)}") state.runner_status = RunnerStatus.FAILED state.error = str(e) cls._save_run_state(state) finally: # Stop graph memory updater if cls._graph_memory_enabled.get(simulation_id, False): try: ZepGraphMemoryManager.stop_updater(simulation_id) logger.info(f"Graph memory update stopped: simulation_id={simulation_id}") except Exception as e: logger.error(f"Failed to stop graph memory updater: {e}") cls._graph_memory_enabled.pop(simulation_id, None) # Clean up process resources cls._processes.pop(simulation_id, None) cls._action_queues.pop(simulation_id, None) # Close log file handles if simulation_id in cls._stdout_files: try: cls._stdout_files[simulation_id].close() except Exception: pass cls._stdout_files.pop(simulation_id, None) if simulation_id in cls._stderr_files and cls._stderr_files[simulation_id]: try: cls._stderr_files[simulation_id].close() except Exception: pass cls._stderr_files.pop(simulation_id, None) @classmethod def _read_action_log( cls, log_path: str, position: int, state: SimulationRunState, platform: str ) -> int: """ Read an action log file Args: log_path: path to the log file position: last read position state: run state object platform: platform name (twitter/reddit) Returns: New read position """ # Check whether graph memory update is enabled graph_memory_enabled = cls._graph_memory_enabled.get(state.simulation_id, False) graph_updater = None if graph_memory_enabled: graph_updater = ZepGraphMemoryManager.get_updater(state.simulation_id) try: with open(log_path, 'r', encoding='utf-8') as f: f.seek(position) for line in f: line = line.strip() if line: try: action_data = json.loads(line) # Handle event-type entries if "event_type" in action_data: event_type = action_data.get("event_type") # Detect simulation_end event and mark platform as completed if event_type == "simulation_end": if platform == "twitter": state.twitter_completed = True state.twitter_running = False logger.info(f"Twitter simulation completed: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}") elif platform == "reddit": state.reddit_completed = True state.reddit_running = False logger.info(f"Reddit simulation completed: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}") # Check if all enabled platforms have completed. # If only one platform is running, check only that one. # If both platforms are running, both must complete. all_completed = cls._check_all_platforms_completed(state) if all_completed: state.runner_status = RunnerStatus.COMPLETED state.completed_at = datetime.now().isoformat() logger.info(f"All platform simulations completed: {state.simulation_id}") # Update round info (from round_end events) elif event_type == "round_end": round_num = action_data.get("round", 0) simulated_hours = action_data.get("simulated_hours", 0) # Update per-platform independent rounds and time if platform == "twitter": if round_num > state.twitter_current_round: state.twitter_current_round = round_num state.twitter_simulated_hours = simulated_hours elif platform == "reddit": if round_num > state.reddit_current_round: state.reddit_current_round = round_num state.reddit_simulated_hours = simulated_hours # Overall round is the maximum across both platforms if round_num > state.current_round: state.current_round = round_num # Overall time is the maximum across both platforms state.simulated_hours = max(state.twitter_simulated_hours, state.reddit_simulated_hours) continue action = AgentAction( round_num=action_data.get("round", 0), timestamp=action_data.get("timestamp", datetime.now().isoformat()), platform=platform, agent_id=action_data.get("agent_id", 0), agent_name=action_data.get("agent_name", ""), action_type=action_data.get("action_type", ""), action_args=action_data.get("action_args", {}), result=action_data.get("result"), success=action_data.get("success", True), ) state.add_action(action) # Update round number if action.round_num and action.round_num > state.current_round: state.current_round = action.round_num # If graph memory update is enabled, send activity to Zep if graph_updater: graph_updater.add_activity_from_dict(action_data, platform) except json.JSONDecodeError: pass return f.tell() except Exception as e: logger.warning(f"Failed to read action log: {log_path}, error={e}") return position @classmethod def _check_all_platforms_completed(cls, state: SimulationRunState) -> bool: """ Check whether all enabled platforms have completed the simulation. A platform is considered enabled if its corresponding actions.jsonl file exists. Returns: True if all enabled platforms have completed """ sim_dir = os.path.join(cls.RUN_STATE_DIR, state.simulation_id) twitter_log = os.path.join(sim_dir, "twitter", "actions.jsonl") reddit_log = os.path.join(sim_dir, "reddit", "actions.jsonl") # Check which platforms are enabled (determined by file existence) twitter_enabled = os.path.exists(twitter_log) reddit_enabled = os.path.exists(reddit_log) # If a platform is enabled but not completed, return False if twitter_enabled and not state.twitter_completed: return False if reddit_enabled and not state.reddit_completed: return False # At least one platform must be enabled and completed return twitter_enabled or reddit_enabled @classmethod def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10): """ Cross-platform process termination (including child processes) Args: process: process to terminate simulation_id: simulation ID (for logging) timeout: seconds to wait for the process to exit """ if IS_WINDOWS: # Windows: use taskkill to terminate the process tree # /F = force terminate, /T = terminate process tree (including children) logger.info(f"Terminating process tree (Windows): simulation={simulation_id}, pid={process.pid}") try: # Attempt graceful termination first subprocess.run( ['taskkill', '/PID', str(process.pid), '/T'], capture_output=True, timeout=5 ) try: process.wait(timeout=timeout) except subprocess.TimeoutExpired: # Force terminate logger.warning(f"Process did not respond; force terminating: {simulation_id}") subprocess.run( ['taskkill', '/F', '/PID', str(process.pid), '/T'], capture_output=True, timeout=5 ) process.wait(timeout=5) except Exception as e: logger.warning(f"taskkill failed, falling back to terminate: {e}") process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() else: # Unix: terminate via process group. # Because start_new_session=True was used, the process group ID equals the main process PID. pgid = os.getpgid(process.pid) logger.info(f"Terminating process group (Unix): simulation={simulation_id}, pgid={pgid}") # Send SIGTERM to the entire process group os.killpg(pgid, signal.SIGTERM) try: process.wait(timeout=timeout) except subprocess.TimeoutExpired: # If still alive after timeout, force send SIGKILL logger.warning(f"Process group did not respond to SIGTERM; force terminating: {simulation_id}") os.killpg(pgid, signal.SIGKILL) process.wait(timeout=5) @classmethod def stop_simulation(cls, simulation_id: str) -> SimulationRunState: """Stop a simulation""" state = cls.get_run_state(simulation_id) if not state: raise ValueError(f"Simulation not found: {simulation_id}") if state.runner_status not in [RunnerStatus.RUNNING, RunnerStatus.PAUSED]: raise ValueError(f"Simulation is not running: {simulation_id}, status={state.runner_status}") state.runner_status = RunnerStatus.STOPPING cls._save_run_state(state) # Terminate process process = cls._processes.get(simulation_id) if process and process.poll() is None: try: cls._terminate_process(process, simulation_id) except ProcessLookupError: # Process no longer exists pass except Exception as e: logger.error(f"Failed to terminate process group: {simulation_id}, error={e}") # Fall back to direct process termination try: process.terminate() process.wait(timeout=5) except Exception: process.kill() state.runner_status = RunnerStatus.STOPPED state.twitter_running = False state.reddit_running = False state.completed_at = datetime.now().isoformat() cls._save_run_state(state) # Stop graph memory updater if cls._graph_memory_enabled.get(simulation_id, False): try: ZepGraphMemoryManager.stop_updater(simulation_id) logger.info(f"Graph memory update stopped: simulation_id={simulation_id}") except Exception as e: logger.error(f"Failed to stop graph memory updater: {e}") cls._graph_memory_enabled.pop(simulation_id, None) logger.info(f"Simulation stopped: {simulation_id}") return state @classmethod def _read_actions_from_file( cls, file_path: str, default_platform: Optional[str] = None, platform_filter: Optional[str] = None, agent_id: Optional[int] = None, round_num: Optional[int] = None ) -> List[AgentAction]: """ Read actions from a single action file Args: file_path: action log file path default_platform: default platform (used when the record has no platform field) platform_filter: filter by platform agent_id: filter by agent ID round_num: filter by round number """ if not os.path.exists(file_path): return [] actions = [] with open(file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue try: data = json.loads(line) # Skip non-action entries (e.g. simulation_start, round_start, round_end events) if "event_type" in data: continue # Skip records without agent_id (non-agent actions) if "agent_id" not in data: continue # Use the record's platform field first; fall back to default record_platform = data.get("platform") or default_platform or "" # Apply filters if platform_filter and record_platform != platform_filter: continue if agent_id is not None and data.get("agent_id") != agent_id: continue if round_num is not None and data.get("round") != round_num: continue actions.append(AgentAction( round_num=data.get("round", 0), timestamp=data.get("timestamp", ""), platform=record_platform, agent_id=data.get("agent_id", 0), agent_name=data.get("agent_name", ""), action_type=data.get("action_type", ""), action_args=data.get("action_args", {}), result=data.get("result"), success=data.get("success", True), )) except json.JSONDecodeError: continue return actions @classmethod def get_all_actions( cls, simulation_id: str, platform: Optional[str] = None, agent_id: Optional[int] = None, round_num: Optional[int] = None ) -> List[AgentAction]: """ Get the complete action history across all platforms (no pagination limit) Args: simulation_id: simulation ID platform: filter by platform (twitter/reddit) agent_id: filter by agent round_num: filter by round number Returns: Complete action list (sorted by timestamp, newest first) """ sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) actions = [] # Read Twitter action file (platform automatically set to "twitter" from file path) twitter_actions_log = os.path.join(sim_dir, "twitter", "actions.jsonl") if not platform or platform == "twitter": actions.extend(cls._read_actions_from_file( twitter_actions_log, default_platform="twitter", # Auto-fill platform field platform_filter=platform, agent_id=agent_id, round_num=round_num )) # Read Reddit action file (platform automatically set to "reddit" from file path) reddit_actions_log = os.path.join(sim_dir, "reddit", "actions.jsonl") if not platform or platform == "reddit": actions.extend(cls._read_actions_from_file( reddit_actions_log, default_platform="reddit", # Auto-fill platform field platform_filter=platform, agent_id=agent_id, round_num=round_num )) # If per-platform files don't exist, try reading the old single-file format if not actions: actions_log = os.path.join(sim_dir, "actions.jsonl") actions = cls._read_actions_from_file( actions_log, default_platform=None, # Old format files should have a platform field platform_filter=platform, agent_id=agent_id, round_num=round_num ) # Sort by timestamp (newest first) actions.sort(key=lambda x: x.timestamp, reverse=True) return actions @classmethod def get_actions( cls, simulation_id: str, limit: int = 100, offset: int = 0, platform: Optional[str] = None, agent_id: Optional[int] = None, round_num: Optional[int] = None ) -> List[AgentAction]: """ Get action history (paginated) Args: simulation_id: simulation ID limit: result count limit offset: offset platform: filter by platform agent_id: filter by agent round_num: filter by round number Returns: Action list """ actions = cls.get_all_actions( simulation_id=simulation_id, platform=platform, agent_id=agent_id, round_num=round_num ) # Paginate return actions[offset:offset + limit] @classmethod def get_timeline( cls, simulation_id: str, start_round: int = 0, end_round: Optional[int] = None ) -> List[Dict[str, Any]]: """ Get simulation timeline (summarized by round) Args: simulation_id: simulation ID start_round: starting round end_round: ending round Returns: Summary info per round """ actions = cls.get_actions(simulation_id, limit=10000) # Group by round rounds: Dict[int, Dict[str, Any]] = {} for action in actions: round_num = action.round_num if round_num < start_round: continue if end_round is not None and round_num > end_round: continue if round_num not in rounds: rounds[round_num] = { "round_num": round_num, "twitter_actions": 0, "reddit_actions": 0, "active_agents": set(), "action_types": {}, "first_action_time": action.timestamp, "last_action_time": action.timestamp, } r = rounds[round_num] if action.platform == "twitter": r["twitter_actions"] += 1 else: r["reddit_actions"] += 1 r["active_agents"].add(action.agent_id) r["action_types"][action.action_type] = r["action_types"].get(action.action_type, 0) + 1 r["last_action_time"] = action.timestamp # Convert to list result = [] for round_num in sorted(rounds.keys()): r = rounds[round_num] result.append({ "round_num": round_num, "twitter_actions": r["twitter_actions"], "reddit_actions": r["reddit_actions"], "total_actions": r["twitter_actions"] + r["reddit_actions"], "active_agents_count": len(r["active_agents"]), "active_agents": list(r["active_agents"]), "action_types": r["action_types"], "first_action_time": r["first_action_time"], "last_action_time": r["last_action_time"], }) return result @classmethod def get_agent_stats(cls, simulation_id: str) -> List[Dict[str, Any]]: """ Get statistics for each agent Returns: Agent statistics list """ actions = cls.get_actions(simulation_id, limit=10000) agent_stats: Dict[int, Dict[str, Any]] = {} for action in actions: agent_id = action.agent_id if agent_id not in agent_stats: agent_stats[agent_id] = { "agent_id": agent_id, "agent_name": action.agent_name, "total_actions": 0, "twitter_actions": 0, "reddit_actions": 0, "action_types": {}, "first_action_time": action.timestamp, "last_action_time": action.timestamp, } stats = agent_stats[agent_id] stats["total_actions"] += 1 if action.platform == "twitter": stats["twitter_actions"] += 1 else: stats["reddit_actions"] += 1 stats["action_types"][action.action_type] = stats["action_types"].get(action.action_type, 0) + 1 stats["last_action_time"] = action.timestamp # Sort by total actions descending result = sorted(agent_stats.values(), key=lambda x: x["total_actions"], reverse=True) return result @classmethod def cleanup_simulation_logs(cls, simulation_id: str) -> Dict[str, Any]: """ Clean up simulation run logs (used to force a fresh restart) Deletes the following files: - run_state.json - twitter/actions.jsonl - reddit/actions.jsonl - simulation.log - stdout.log / stderr.log - twitter_simulation.db (simulation database) - reddit_simulation.db (simulation database) - env_status.json (environment status) Note: does NOT delete config files (simulation_config.json) or profile files Args: simulation_id: simulation ID Returns: Cleanup result info """ import shutil sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) if not os.path.exists(sim_dir): return {"success": True, "message": "Simulation directory does not exist; nothing to clean up"} cleaned_files = [] errors = [] # Files to delete (including database files) files_to_delete = [ "run_state.json", "simulation.log", "stdout.log", "stderr.log", "twitter_simulation.db", # Twitter platform database "reddit_simulation.db", # Reddit platform database "env_status.json", # Environment status file ] # Directories to clean (contain action logs) dirs_to_clean = ["twitter", "reddit"] # Delete files for filename in files_to_delete: file_path = os.path.join(sim_dir, filename) if os.path.exists(file_path): try: os.remove(file_path) cleaned_files.append(filename) except Exception as e: errors.append(f"Failed to delete {filename}: {str(e)}") # Clean action logs in platform directories for dir_name in dirs_to_clean: dir_path = os.path.join(sim_dir, dir_name) if os.path.exists(dir_path): actions_file = os.path.join(dir_path, "actions.jsonl") if os.path.exists(actions_file): try: os.remove(actions_file) cleaned_files.append(f"{dir_name}/actions.jsonl") except Exception as e: errors.append(f"Failed to delete {dir_name}/actions.jsonl: {str(e)}") # Remove in-memory run state if simulation_id in cls._run_states: del cls._run_states[simulation_id] logger.info(f"Simulation log cleanup complete: {simulation_id}, deleted files: {cleaned_files}") return { "success": len(errors) == 0, "cleaned_files": cleaned_files, "errors": errors if errors else None } # Flag to prevent duplicate cleanup _cleanup_done = False @classmethod def cleanup_all_simulations(cls): """ Clean up all running simulation processes Called when the server shuts down to ensure all child processes are terminated """ # Prevent duplicate cleanup if cls._cleanup_done: return cls._cleanup_done = True # Check whether there is anything to clean up (avoid printing useless logs for empty process maps) has_processes = bool(cls._processes) has_updaters = bool(cls._graph_memory_enabled) if not has_processes and not has_updaters: return # Nothing to clean up; return silently logger.info("Cleaning up all simulation processes...") # Stop all graph memory updaters first (stop_all logs internally) try: ZepGraphMemoryManager.stop_all() except Exception as e: logger.error(f"Failed to stop graph memory updaters: {e}") cls._graph_memory_enabled.clear() # Copy dict to avoid modifying it while iterating processes = list(cls._processes.items()) for simulation_id, process in processes: try: if process.poll() is None: # Process is still running logger.info(f"Terminating simulation process: {simulation_id}, pid={process.pid}") try: # Use cross-platform process termination cls._terminate_process(process, simulation_id, timeout=5) except (ProcessLookupError, OSError): # Process may already be gone; try direct termination try: process.terminate() process.wait(timeout=3) except Exception: process.kill() # Update run_state.json state = cls.get_run_state(simulation_id) if state: state.runner_status = RunnerStatus.STOPPED state.twitter_running = False state.reddit_running = False state.completed_at = datetime.now().isoformat() state.error = "Server shutdown; simulation was terminated" cls._save_run_state(state) # Also update state.json to set status to stopped try: sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) state_file = os.path.join(sim_dir, "state.json") logger.info(f"Updating state.json: {state_file}") if os.path.exists(state_file): with open(state_file, 'r', encoding='utf-8') as f: state_data = json.load(f) state_data['status'] = 'stopped' state_data['updated_at'] = datetime.now().isoformat() with open(state_file, 'w', encoding='utf-8') as f: json.dump(state_data, f, indent=2, ensure_ascii=False) logger.info(f"Updated state.json status to stopped: {simulation_id}") else: logger.warning(f"state.json not found: {state_file}") except Exception as state_err: logger.warning(f"Failed to update state.json: {simulation_id}, error={state_err}") except Exception as e: logger.error(f"Failed to clean up process: {simulation_id}, error={e}") # Close file handles for simulation_id, file_handle in list(cls._stdout_files.items()): try: if file_handle: file_handle.close() except Exception: pass cls._stdout_files.clear() for simulation_id, file_handle in list(cls._stderr_files.items()): try: if file_handle: file_handle.close() except Exception: pass cls._stderr_files.clear() # Clear in-memory state cls._processes.clear() cls._action_queues.clear() logger.info("Simulation process cleanup complete") @classmethod def register_cleanup(cls): """ Register cleanup function Called at Flask application startup to ensure all simulation processes are terminated when the server shuts down """ global _cleanup_registered if _cleanup_registered: return # In Flask debug mode, only register cleanup in the reloader child process # (the process that actually runs the application). # WERKZEUG_RUN_MAIN=true indicates the reloader child process. # In non-debug mode this env var is absent, and we always register. is_reloader_process = os.environ.get('WERKZEUG_RUN_MAIN') == 'true' is_debug_mode = os.environ.get('FLASK_DEBUG') == '1' or os.environ.get('WERKZEUG_RUN_MAIN') is not None # In debug mode, only register in the reloader child process; in non-debug mode always register if is_debug_mode and not is_reloader_process: _cleanup_registered = True # Mark as registered to prevent child process from trying again return # Save original signal handlers original_sigint = signal.getsignal(signal.SIGINT) original_sigterm = signal.getsignal(signal.SIGTERM) # SIGHUP only exists on Unix (macOS/Linux), not on Windows original_sighup = None has_sighup = hasattr(signal, 'SIGHUP') if has_sighup: original_sighup = signal.getsignal(signal.SIGHUP) def cleanup_handler(signum=None, frame=None): """Signal handler: clean up simulation processes, then invoke original handler""" # Only log when there are processes to clean up if cls._processes or cls._graph_memory_enabled: logger.info(f"Received signal {signum}; starting cleanup...") cls.cleanup_all_simulations() # Invoke the original signal handler so Flask exits normally if signum == signal.SIGINT and callable(original_sigint): original_sigint(signum, frame) elif signum == signal.SIGTERM and callable(original_sigterm): original_sigterm(signum, frame) elif has_sighup and signum == signal.SIGHUP: # SIGHUP: sent when the terminal closes if callable(original_sighup): original_sighup(signum, frame) else: # Default behavior: exit normally sys.exit(0) else: # If original handler is not callable (e.g. SIG_DFL), use default behavior raise KeyboardInterrupt # Register atexit handler (as a fallback) atexit.register(cls.cleanup_all_simulations) # Register signal handlers (main thread only) try: # SIGTERM: default signal sent by kill command signal.signal(signal.SIGTERM, cleanup_handler) # SIGINT: Ctrl+C signal.signal(signal.SIGINT, cleanup_handler) # SIGHUP: terminal close (Unix only) if has_sighup: signal.signal(signal.SIGHUP, cleanup_handler) except ValueError: # Not in the main thread; atexit only logger.warning("Cannot register signal handlers (not in main thread); using atexit only") _cleanup_registered = True @classmethod def get_running_simulations(cls) -> List[str]: """ Get a list of all currently running simulation IDs """ running = [] for sim_id, process in cls._processes.items(): if process.poll() is None: running.append(sim_id) return running # ============== Interview functionality ============== @classmethod def check_env_alive(cls, simulation_id: str) -> bool: """ Check whether the simulation environment is alive (able to receive Interview commands) Args: simulation_id: simulation ID Returns: True if the environment is alive, False if it has been closed """ sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) if not os.path.exists(sim_dir): return False ipc_client = SimulationIPCClient(sim_dir) return ipc_client.check_env_alive() @classmethod def get_env_status_detail(cls, simulation_id: str) -> Dict[str, Any]: """ Get detailed environment status for a simulation Args: simulation_id: simulation ID Returns: Status detail dict containing: status, twitter_available, reddit_available, timestamp """ sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) status_file = os.path.join(sim_dir, "env_status.json") default_status = { "status": "stopped", "twitter_available": False, "reddit_available": False, "timestamp": None } if not os.path.exists(status_file): return default_status try: with open(status_file, 'r', encoding='utf-8') as f: status = json.load(f) return { "status": status.get("status", "stopped"), "twitter_available": status.get("twitter_available", False), "reddit_available": status.get("reddit_available", False), "timestamp": status.get("timestamp") } except (json.JSONDecodeError, OSError): return default_status @classmethod def interview_agent( cls, simulation_id: str, agent_id: int, prompt: str, platform: str = None, timeout: float = 60.0 ) -> Dict[str, Any]: """ Interview a single agent Args: simulation_id: simulation ID agent_id: Agent ID prompt: interview question platform: target platform (optional) - "twitter": interview only Twitter platform - "reddit": interview only Reddit platform - None: in dual-platform mode, interview both and return integrated result timeout: timeout in seconds Returns: Interview result dict Raises: ValueError: simulation not found or environment not running TimeoutError: timed out waiting for response """ sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) if not os.path.exists(sim_dir): raise ValueError(f"Simulation not found: {simulation_id}") ipc_client = SimulationIPCClient(sim_dir) if not ipc_client.check_env_alive(): raise ValueError(f"Simulation environment is not running or has been closed; cannot interview: {simulation_id}") logger.info(f"Sending Interview command: simulation_id={simulation_id}, agent_id={agent_id}, platform={platform}") response = ipc_client.send_interview( agent_id=agent_id, prompt=prompt, platform=platform, timeout=timeout ) if response.status.value == "completed": return { "success": True, "agent_id": agent_id, "prompt": prompt, "result": response.result, "timestamp": response.timestamp } else: return { "success": False, "agent_id": agent_id, "prompt": prompt, "error": response.error, "timestamp": response.timestamp } @classmethod def interview_agents_batch( cls, simulation_id: str, interviews: List[Dict[str, Any]], platform: str = None, timeout: float = 120.0 ) -> Dict[str, Any]: """ Batch-interview multiple agents Args: simulation_id: simulation ID interviews: list of interviews, each containing {"agent_id": int, "prompt": str, "platform": str (optional)} platform: default platform (optional; overridden per-item by each interview's platform) - "twitter": default to Twitter platform only - "reddit": default to Reddit platform only - None: in dual-platform mode, interview each agent on both platforms timeout: timeout in seconds Returns: Batch interview result dict Raises: ValueError: simulation not found or environment not running TimeoutError: timed out waiting for response """ sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) if not os.path.exists(sim_dir): raise ValueError(f"Simulation not found: {simulation_id}") ipc_client = SimulationIPCClient(sim_dir) if not ipc_client.check_env_alive(): raise ValueError(f"Simulation environment is not running or has been closed; cannot interview: {simulation_id}") logger.info(f"Sending batch Interview command: simulation_id={simulation_id}, count={len(interviews)}, platform={platform}") response = ipc_client.send_batch_interview( interviews=interviews, platform=platform, timeout=timeout ) if response.status.value == "completed": return { "success": True, "interviews_count": len(interviews), "result": response.result, "timestamp": response.timestamp } else: return { "success": False, "interviews_count": len(interviews), "error": response.error, "timestamp": response.timestamp } @classmethod def interview_all_agents( cls, simulation_id: str, prompt: str, platform: str = None, timeout: float = 180.0 ) -> Dict[str, Any]: """ Interview all agents (global interview) Uses the same question to interview all agents in the simulation Args: simulation_id: simulation ID prompt: interview question (same for all agents) platform: target platform (optional) - "twitter": interview only Twitter platform - "reddit": interview only Reddit platform - None: in dual-platform mode, interview each agent on both platforms timeout: timeout in seconds Returns: Global interview result dict """ sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) if not os.path.exists(sim_dir): raise ValueError(f"Simulation not found: {simulation_id}") # Get all agent info from config file config_path = os.path.join(sim_dir, "simulation_config.json") if not os.path.exists(config_path): raise ValueError(f"Simulation config not found: {simulation_id}") with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) agent_configs = config.get("agent_configs", []) if not agent_configs: raise ValueError(f"No agents in simulation config: {simulation_id}") # Build batch interview list interviews = [] for agent_config in agent_configs: agent_id = agent_config.get("agent_id") if agent_id is not None: interviews.append({ "agent_id": agent_id, "prompt": prompt }) logger.info(f"Sending global Interview command: simulation_id={simulation_id}, agent_count={len(interviews)}, platform={platform}") return cls.interview_agents_batch( simulation_id=simulation_id, interviews=interviews, platform=platform, timeout=timeout ) @classmethod def close_simulation_env( cls, simulation_id: str, timeout: float = 30.0 ) -> Dict[str, Any]: """ Close the simulation environment (without stopping the simulation process) Sends a close-environment command to the simulation, causing it to exit the command-waiting mode gracefully. Args: simulation_id: simulation ID timeout: timeout in seconds Returns: Operation result dict """ sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) if not os.path.exists(sim_dir): raise ValueError(f"Simulation not found: {simulation_id}") ipc_client = SimulationIPCClient(sim_dir) if not ipc_client.check_env_alive(): return { "success": True, "message": "Environment is already closed" } logger.info(f"Sending close-environment command: simulation_id={simulation_id}") try: response = ipc_client.send_close_env(timeout=timeout) return { "success": response.status.value == "completed", "message": "Close-environment command sent", "result": response.result, "timestamp": response.timestamp } except TimeoutError: # Timeout may mean the environment is already shutting down return { "success": True, "message": "Close-environment command sent (timed out waiting for response; environment may be closing)" } @classmethod def _get_interview_history_from_db( cls, db_path: str, platform_name: str, agent_id: Optional[int] = None, limit: int = 100 ) -> List[Dict[str, Any]]: """Fetch Interview history from a single database""" import sqlite3 if not os.path.exists(db_path): return [] results = [] try: conn = sqlite3.connect(db_path) cursor = conn.cursor() if agent_id is not None: cursor.execute(""" SELECT user_id, info, created_at FROM trace WHERE action = 'interview' AND user_id = ? ORDER BY created_at DESC LIMIT ? """, (agent_id, limit)) else: cursor.execute(""" SELECT user_id, info, created_at FROM trace WHERE action = 'interview' ORDER BY created_at DESC LIMIT ? """, (limit,)) for user_id, info_json, created_at in cursor.fetchall(): try: info = json.loads(info_json) if info_json else {} except json.JSONDecodeError: info = {"raw": info_json} results.append({ "agent_id": user_id, "response": info.get("response", info), "prompt": info.get("prompt", ""), "timestamp": created_at, "platform": platform_name }) conn.close() except Exception as e: logger.error(f"Failed to read Interview history ({platform_name}): {e}") return results @classmethod def get_interview_history( cls, simulation_id: str, platform: str = None, agent_id: Optional[int] = None, limit: int = 100 ) -> List[Dict[str, Any]]: """ Get Interview history records (read from database) Args: simulation_id: simulation ID platform: platform type (reddit/twitter/None) - "reddit": only fetch Reddit platform history - "twitter": only fetch Twitter platform history - None: fetch history from both platforms agent_id: filter by agent ID (optional) limit: per-platform result count limit Returns: Interview history record list """ sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) results = [] # Determine which platforms to query if platform in ("reddit", "twitter"): platforms = [platform] else: # No platform specified: query both platforms = ["twitter", "reddit"] for p in platforms: db_path = os.path.join(sim_dir, f"{p}_simulation.db") platform_results = cls._get_interview_history_from_db( db_path=db_path, platform_name=p, agent_id=agent_id, limit=limit ) results.extend(platform_results) # Sort by timestamp descending results.sort(key=lambda x: x.get("timestamp", ""), reverse=True) # If multiple platforms were queried, cap total count if len(platforms) > 1 and len(results) > limit: results = results[:limit] return results