""" Simulation-related API routes Step 2: Zep entity reading & filtering, OASIS simulation preparation & execution (fully automated) """ import os import traceback from flask import request, jsonify, send_file, Response from . import simulation_bp from .. import get_storage from ..config import Config from ..services.zep_entity_reader import ZepEntityReader from ..services.oasis_profile_generator import OasisProfileGenerator from ..services.simulation_manager import SimulationManager, SimulationStatus from ..services.simulation_runner import SimulationRunner, RunnerStatus from ..utils.logger import get_logger from ..utils.locale import t, get_locale, set_locale from ..models.project import ProjectManager logger = get_logger('mirofish.api.simulation') def _get_simulation_log_path(simulation_id: str) -> str: """Return path to the combined actions log for a simulation.""" sim_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id) for candidate in [ os.path.join(sim_dir, "actions.jsonl"), os.path.join(sim_dir, "twitter", "actions.jsonl"), os.path.join(sim_dir, "reddit", "actions.jsonl"), ]: if os.path.exists(candidate): return candidate return "" # Interview prompt optimization prefix # Adding this prefix prevents the Agent from calling tools and forces a direct text reply INTERVIEW_PROMPT_PREFIX = "Based on your persona, all past memories and actions, reply to me directly in text without calling any tools: " def optimize_interview_prompt(prompt: str) -> str: """ Optimize an interview question by adding a prefix to prevent tool calls. Args: prompt: original question Returns: optimized question """ if not prompt: return prompt # Avoid adding the prefix twice if prompt.startswith(INTERVIEW_PROMPT_PREFIX): return prompt return f"{INTERVIEW_PROMPT_PREFIX}{prompt}" # ============== Entity retrieval endpoints ============== @simulation_bp.route('/entities//count', methods=['GET']) def get_graph_entity_count(graph_id: str): """ Fast entity count for a graph — returns only the filtered_count, no entity data or edge enrichment. """ try: reader = ZepEntityReader() result = reader.filter_defined_entities( graph_id=graph_id, defined_entity_types=None, enrich_with_edges=False ) return jsonify({ "success": True, "data": { "filtered_count": result.filtered_count, "entity_types": list(result.entity_types), } }) except Exception as e: logger.error(f"Failed to get entity count for graph {graph_id}: {e}") return jsonify({"success": False, "error": str(e)}), 500 @simulation_bp.route('/entities/', methods=['GET']) def get_graph_entities(graph_id: str): """ Get all entities in the graph (filtered) Returns only nodes matching predefined entity types (nodes with Labels beyond just "Entity"). Query parameters: entity_types: comma-separated entity type list (optional, for further filtering) enrich: whether to fetch related edge info (default true) """ try: entity_types_str = request.args.get('entity_types', '') entity_types = [t.strip() for t in entity_types_str.split(',') if t.strip()] if entity_types_str else None enrich = request.args.get('enrich', 'true').lower() == 'true' logger.info(f"Fetching graph entities: graph_id={graph_id}, entity_types={entity_types}, enrich={enrich}") reader = ZepEntityReader() result = reader.filter_defined_entities( graph_id=graph_id, defined_entity_types=entity_types, enrich_with_edges=enrich ) return jsonify({ "success": True, "data": result.to_dict() }) except Exception as e: logger.error(f"Failed to get graph entities: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/entities//', methods=['GET']) def get_entity_detail(graph_id: str, entity_uuid: str): """Get detailed information about a single entity""" try: reader = ZepEntityReader() entity = reader.get_entity_with_context(graph_id, entity_uuid) if not entity: return jsonify({ "success": False, "error": t('api.entityNotFound', id=entity_uuid) }), 404 return jsonify({ "success": True, "data": entity.to_dict() }) except Exception as e: logger.error(f"Failed to get entity details: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/entities//by-type/', methods=['GET']) def get_entities_by_type(graph_id: str, entity_type: str): """Get all entities of a specified type""" try: enrich = request.args.get('enrich', 'true').lower() == 'true' reader = ZepEntityReader() entities = reader.get_entities_by_type( graph_id=graph_id, entity_type=entity_type, enrich_with_edges=enrich ) return jsonify({ "success": True, "data": { "entity_type": entity_type, "count": len(entities), "entities": [e.to_dict() for e in entities] } }) except Exception as e: logger.error(f"Failed to get entities: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== Simulation management endpoints ============== @simulation_bp.route('/create', methods=['POST']) def create_simulation(): """ Create a new simulation Note: parameters like max_rounds are intelligently generated by the LLM; no manual setup needed. Request (JSON): { "project_id": "proj_xxxx", // required "graph_id": "mirofish_xxxx", // optional, falls back to project graph_id "enable_twitter": true, // optional, default true "enable_reddit": true // optional, default true } Returns: { "success": true, "data": { "simulation_id": "sim_xxxx", "project_id": "proj_xxxx", "graph_id": "mirofish_xxxx", "status": "created", "enable_twitter": true, "enable_reddit": true, "created_at": "2025-12-01T10:00:00" } } """ try: data = request.get_json() or {} project_id = data.get('project_id') if not project_id: return jsonify({ "success": False, "error": t('api.requireProjectId') }), 400 project = ProjectManager.get_project(project_id) if not project: return jsonify({ "success": False, "error": t('api.projectNotFound', id=project_id) }), 404 graph_id = data.get('graph_id') or project.get("graph_id") if not graph_id: return jsonify({ "success": False, "error": t('api.graphNotBuilt') }), 400 manager = SimulationManager() state = manager.create_simulation( project_id=project_id, graph_id=graph_id, enable_twitter=data.get('enable_twitter', True), enable_reddit=data.get('enable_reddit', True), ) return jsonify({ "success": True, "data": state.to_dict() }) except Exception as e: logger.error(f"Failed to create simulation: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 def _check_simulation_prepared(simulation_id: str) -> tuple: """ Check whether the simulation preparation is complete. Checks: 1. state.json exists and status is "ready" 2. Required files exist: reddit_profiles.json, twitter_profiles.csv, simulation_config.json Note: run scripts (run_*.py) stay in backend/scripts/ and are no longer copied to the simulation directory. Args: simulation_id: simulation ID Returns: (is_prepared: bool, info: dict) """ import os from ..config import Config simulation_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id) # Check if directory exists if not os.path.exists(simulation_dir): return False, {"reason": "Simulation directory does not exist"} # Read state.json first to know which files are required. # profiles_ready = agents done but config not yet generated (Fase A waiting for user). # All other active statuses require simulation_config.json too. state_file = os.path.join(simulation_dir, "state.json") if not os.path.exists(state_file): return False, {"reason": "Missing state.json"} import json try: with open(state_file, 'r', encoding='utf-8') as _f: _state_preview = json.load(_f) _preview_status = _state_preview.get("status", "") except Exception: _preview_status = "" profiles_only_statuses = {"profiles_ready"} if _preview_status in profiles_only_statuses: required_files = ["state.json", "reddit_profiles.json"] else: required_files = [ "state.json", "simulation_config.json", "reddit_profiles.json", "twitter_profiles.csv" ] # Check if files exist existing_files = [] missing_files = [] for f in required_files: file_path = os.path.join(simulation_dir, f) if os.path.exists(file_path): existing_files.append(f) else: missing_files.append(f) if missing_files: return False, { "reason": "Missing required files", "missing_files": missing_files, "existing_files": existing_files } # Check status in state.json (already read above; reuse to avoid double read) try: state_data = _state_preview status = _preview_status config_generated = state_data.get("config_generated", False) # Detailed log logger.debug(f"Checking simulation preparation status: {simulation_id}, status={status}, config_generated={config_generated}") # If config_generated=True and files exist, treat preparation as complete. # All of the following statuses indicate preparation has finished: # - ready: preparation complete, can run # - preparing: if config_generated=True, preparation is done # - running: currently running, meaning preparation completed long ago # - completed: run finished, meaning preparation completed long ago # - stopped: stopped, meaning preparation completed long ago # - failed: run failed (but preparation was complete) # profiles_ready = agents generated, awaiting user confirmation (config not yet generated) # all other statuses require config_generated=True to be considered fully prepared prepared_statuses = ["ready", "preparing", "running", "completed", "stopped", "failed"] if status == "profiles_ready" or (status in prepared_statuses and config_generated): # Get file statistics profiles_file = os.path.join(simulation_dir, "reddit_profiles.json") config_file = os.path.join(simulation_dir, "simulation_config.json") profiles_count = 0 if os.path.exists(profiles_file): with open(profiles_file, 'r', encoding='utf-8') as f: profiles_data = json.load(f) profiles_count = len(profiles_data) if isinstance(profiles_data, list) else 0 # If status is "preparing" but files are done, auto-update status to "ready" if status == "preparing": try: state_data["status"] = "ready" from datetime import datetime state_data["updated_at"] = datetime.now().isoformat() with open(state_file, 'w', encoding='utf-8') as f: json.dump(state_data, f, ensure_ascii=False, indent=2) logger.info(f"Auto-updated simulation status: {simulation_id} preparing -> ready") status = "ready" except Exception as e: logger.warning(f"Failed to auto-update status: {e}") logger.info(f"Simulation {simulation_id} check result: preparation complete (status={status}, config_generated={config_generated})") return True, { "status": status, "entities_count": state_data.get("entities_count", 0), "profiles_count": profiles_count, "entity_types": state_data.get("entity_types", []), "config_generated": config_generated, "created_at": state_data.get("created_at"), "updated_at": state_data.get("updated_at"), "existing_files": existing_files } else: logger.warning(f"Simulation {simulation_id} check result: preparation not complete (status={status}, config_generated={config_generated})") return False, { "reason": f"Status not in prepared list or config_generated is false: status={status}, config_generated={config_generated}", "status": status, "config_generated": config_generated } except Exception as e: return False, {"reason": f"Failed to read state file: {str(e)}"} @simulation_bp.route('/prepare', methods=['POST']) def prepare_simulation(): """ Prepare the simulation environment (async task, all parameters generated by LLM) This is a long-running operation; the endpoint returns task_id immediately. Use GET /api/simulation/prepare/status to poll progress. Features: - Automatically detects completed preparation to avoid regenerating - Returns existing results immediately if preparation is already done - Supports force regeneration (force_regenerate=true) Steps: 1. Check whether preparation has already been completed 2. Read and filter entities from the Zep knowledge graph 3. Generate OASIS Agent Profiles for each entity (with retry) 4. Intelligently generate simulation config via LLM (with retry) 5. Save config files and preset scripts Request (JSON): { "simulation_id": "sim_xxxx", // required, simulation ID "entity_types": ["Student", "PublicFigure"], // optional, specify entity types "use_llm_for_profiles": true, // optional, use LLM to generate personas "parallel_profile_count": 5, // optional, parallel profile generation, default 5 "force_regenerate": false // optional, force regeneration, default false } Returns: { "success": true, "data": { "simulation_id": "sim_xxxx", "task_id": "task_xxxx", // returned for new tasks "status": "preparing|ready", "message": "Preparation task started | Preparation already complete", "already_prepared": true|false // whether preparation was already done } } """ import threading import os from ..models.task import TaskManager, TaskStatus from ..config import Config try: data = request.get_json() or {} simulation_id = data.get('simulation_id') if not simulation_id: return jsonify({ "success": False, "error": t('api.requireSimulationId') }), 400 manager = SimulationManager() state = manager.get_simulation(simulation_id) if not state: return jsonify({ "success": False, "error": t('api.simulationNotFound', id=simulation_id) }), 404 # Check whether force regeneration is requested force_regenerate = data.get('force_regenerate', False) logger.info(f"Processing /prepare request: simulation_id={simulation_id}, force_regenerate={force_regenerate}") # Check whether preparation is already done (avoid regenerating) if not force_regenerate: logger.debug(f"Checking whether simulation {simulation_id} is already prepared...") is_prepared, prepare_info = _check_simulation_prepared(simulation_id) logger.debug(f"Check result: is_prepared={is_prepared}, prepare_info={prepare_info}") if is_prepared: logger.info(f"Simulation {simulation_id} is already prepared; skipping regeneration") # Return the actual simulation status so the frontend can decide # whether to show Fase A (profiles_ready) or skip it (ready). actual_status = state.status.value if hasattr(state.status, 'value') else str(state.status) return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "status": actual_status, "message": t('api.alreadyPrepared'), "already_prepared": True, "prepare_info": prepare_info } }) else: logger.info(f"Simulation {simulation_id} is not prepared; starting preparation task") # Get required info from project project = ProjectManager.get_project(state.project_id) if not project: return jsonify({ "success": False, "error": t('api.projectNotFound', id=state.project_id) }), 404 # Get simulation requirement simulation_requirement = project.get("simulation_requirement") or "" if not simulation_requirement: return jsonify({ "success": False, "error": t('api.projectMissingRequirement') }), 400 # Get document text document_text = ProjectManager.get_extracted_text(state.project_id, get_storage()) or "" entity_types_list = data.get('entity_types') max_agents = data.get('max_agents') # optional: limit to top-N most-connected entities if max_agents is not None: try: max_agents = int(max_agents) except (TypeError, ValueError): return jsonify({"success": False, "error": "max_agents must be an integer"}), 400 use_llm_for_profiles = data.get('use_llm_for_profiles', True) parallel_profile_count = data.get('parallel_profile_count', 5) # ========== Synchronously fetch entity count (before background task starts) ========== # This lets the frontend obtain the expected total agent count immediately after calling prepare. try: logger.info(f"Synchronously fetching entity count: graph_id={state.graph_id}") reader = ZepEntityReader() # Quick entity read (no edge info needed, just count) filtered_preview = reader.filter_defined_entities( graph_id=state.graph_id, defined_entity_types=entity_types_list, enrich_with_edges=False # Skip edge info to speed things up ) # Save entity count to state — capped by max_agents if provided raw_count = filtered_preview.filtered_count state.entities_count = min(raw_count, max_agents) if max_agents else raw_count state.entity_types = list(filtered_preview.entity_types) logger.info(f"Expected entity count: {state.entities_count} (raw={raw_count}, max_agents={max_agents})") except Exception as e: logger.warning(f"Failed to synchronously fetch entity count (will retry in background task): {e}") # Failure does not block the rest of the flow; the background task will retry. # Create async task task_manager = TaskManager() task_id = task_manager.create_task( task_type="simulation_prepare", metadata={ "simulation_id": simulation_id, "project_id": state.project_id } ) # Update simulation status (includes pre-fetched entity count) state.status = SimulationStatus.PREPARING manager._save_simulation_state(state) # Capture locale before spawning background thread current_locale = get_locale() # Define background task def run_prepare(): set_locale(current_locale) try: task_manager.update_task( task_id, status=TaskStatus.PROCESSING, progress=0, message=t('progress.startPreparingEnv') ) # Prepare simulation (with progress callback) # Store per-stage progress details stage_details = {} def progress_callback(stage, progress, message, **kwargs): # Calculate overall progress stage_weights = { "reading": (0, 20), # 0-20% "generating_profiles": (20, 70), # 20-70% "generating_config": (70, 90), # 70-90% "copying_scripts": (90, 100) # 90-100% } start, end = stage_weights.get(stage, (0, 100)) current_progress = int(start + (end - start) * progress / 100) # Build detailed progress info stage_names = { "reading": t('progress.readingGraphEntities'), "generating_profiles": t('progress.generatingProfiles'), "generating_config": t('progress.generatingSimConfig'), "copying_scripts": t('progress.preparingScripts') } stage_index = list(stage_weights.keys()).index(stage) + 1 if stage in stage_weights else 1 total_stages = len(stage_weights) # Update stage details stage_details[stage] = { "stage_name": stage_names.get(stage, stage), "stage_progress": progress, "current": kwargs.get("current", 0), "total": kwargs.get("total", 0), "item_name": kwargs.get("item_name", "") } # Build detailed progress info detail = stage_details[stage] progress_detail_data = { "current_stage": stage, "current_stage_name": stage_names.get(stage, stage), "stage_index": stage_index, "total_stages": total_stages, "stage_progress": progress, "current_item": detail["current"], "total_items": detail["total"], "item_description": message } # Build concise message if detail["total"] > 0: detailed_message = ( f"[{stage_index}/{total_stages}] {stage_names.get(stage, stage)}: " f"{detail['current']}/{detail['total']} - {message}" ) else: detailed_message = f"[{stage_index}/{total_stages}] {stage_names.get(stage, stage)}: {message}" task_manager.update_task( task_id, progress=current_progress, message=detailed_message, progress_detail=progress_detail_data ) result_state = manager.prepare_simulation( simulation_id=simulation_id, simulation_requirement=simulation_requirement, document_text=document_text, defined_entity_types=entity_types_list, max_agents=max_agents, use_llm_for_profiles=use_llm_for_profiles, progress_callback=progress_callback, parallel_profile_count=parallel_profile_count ) # Task complete task_manager.complete_task( task_id, result=result_state.to_simple_dict() ) except Exception as e: logger.error(f"Failed to prepare simulation: {str(e)}") task_manager.fail_task(task_id, str(e)) # Update simulation status to failed state = manager.get_simulation(simulation_id) if state: state.status = SimulationStatus.FAILED state.error = str(e) manager._save_simulation_state(state) # Start background thread thread = threading.Thread(target=run_prepare, daemon=True) thread.start() return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "task_id": task_id, "status": "preparing", "message": t('api.prepareStarted'), "already_prepared": False, "expected_entities_count": state.entities_count, # Expected total agent count "entity_types": state.entity_types # Entity type list } }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 404 except Exception as e: logger.error(f"Failed to start preparation task: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/prepare/status', methods=['POST']) def get_prepare_status(): """ Query preparation task progress Supports two query modes: 1. Query an in-progress task by task_id 2. Check whether preparation is already complete via simulation_id Request (JSON): { "task_id": "task_xxxx", // optional, task_id from prepare "simulation_id": "sim_xxxx" // optional, simulation ID (to check completed preparation) } Returns: { "success": true, "data": { "task_id": "task_xxxx", "status": "processing|completed|ready", "progress": 45, "message": "...", "already_prepared": true|false, // whether preparation was already done "prepare_info": {...} // details when preparation is complete } } """ from ..models.task import TaskManager try: data = request.get_json() or {} task_id = data.get('task_id') simulation_id = data.get('simulation_id') # If simulation_id provided, check whether preparation is already complete if simulation_id: is_prepared, prepare_info = _check_simulation_prepared(simulation_id) if is_prepared: sim_state = SimulationManager().get_simulation(simulation_id) actual_status = sim_state.status.value if sim_state and hasattr(sim_state.status, 'value') else "ready" return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "status": actual_status, "simulation_status": actual_status, # mirror so frontend pollPrepareStatus can detect profiles_ready "progress": 100, "message": t('api.alreadyPrepared'), "already_prepared": True, "prepare_info": prepare_info } }) # If no task_id, return error if not task_id: if simulation_id: # Has simulation_id but not yet prepared return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "status": "not_started", "progress": 0, "message": t('api.notStartedPrepare'), "already_prepared": False } }) return jsonify({ "success": False, "error": t('api.requireTaskOrSimId') }), 400 task_manager = TaskManager() task = task_manager.get_task(task_id) if not task: # Task not found; if simulation_id provided, check whether preparation is done if simulation_id: is_prepared, prepare_info = _check_simulation_prepared(simulation_id) if is_prepared: sim_state = SimulationManager().get_simulation(simulation_id) actual_status = sim_state.status.value if sim_state and hasattr(sim_state.status, 'value') else "ready" return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "task_id": task_id, "status": actual_status, "progress": 100, "message": t('api.taskCompletedPrepared'), "already_prepared": True, "prepare_info": prepare_info } }) return jsonify({ "success": False, "error": t('api.taskNotFound', id=task_id) }), 404 task_dict = task task_dict["already_prepared"] = False # Add the real simulation status alongside the task status so the frontend # can show the correct phase without altering task.status (which is used by # pollTaskUntilDone to detect completion). if simulation_id: sim_state = SimulationManager().get_simulation(simulation_id) if sim_state: task_dict["simulation_status"] = sim_state.status.value if hasattr(sim_state.status, 'value') else str(sim_state.status) return jsonify({ "success": True, "data": task_dict }) except Exception as e: logger.error(f"Failed to query task status: {str(e)}") return jsonify({ "success": False, "error": str(e) }), 500 @simulation_bp.route('/', methods=['GET']) def get_simulation(simulation_id: str): """Get simulation status""" try: manager = SimulationManager() state = manager.get_simulation(simulation_id) if not state: return jsonify({ "success": False, "error": t('api.simulationNotFound', id=simulation_id) }), 404 result = state.to_dict() # If simulation is ready, attach run instructions if state.status == SimulationStatus.READY: result["run_instructions"] = manager.get_run_instructions(simulation_id) return jsonify({ "success": True, "data": result }) except Exception as e: logger.error(f"Failed to get simulation status: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/list', methods=['GET']) def list_simulations(): """ List all simulations Query parameters: project_id: filter by project ID (optional) """ try: project_id = request.args.get('project_id') manager = SimulationManager() simulations = manager.list_simulations(project_id=project_id) return jsonify({ "success": True, "data": [s.to_dict() for s in simulations], "count": len(simulations) }) except Exception as e: logger.error(f"Failed to list simulations: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 def _get_report_id_for_simulation(simulation_id: str) -> str: """ Get the most recent report_id associated with a simulation. Scans the reports directory for reports matching simulation_id, returning the most recent one (sorted by created_at) if multiple exist. Args: simulation_id: simulation ID Returns: report_id or None """ import json from datetime import datetime # reports directory path: backend/uploads/reports # __file__ is app/api/simulation.py — go up two levels to backend/ reports_dir = os.path.join(os.path.dirname(__file__), '../../uploads/reports') if not os.path.exists(reports_dir): return None matching_reports = [] try: for report_folder in os.listdir(reports_dir): report_path = os.path.join(reports_dir, report_folder) if not os.path.isdir(report_path): continue meta_file = os.path.join(report_path, "meta.json") if not os.path.exists(meta_file): continue try: with open(meta_file, 'r', encoding='utf-8') as f: meta = json.load(f) if meta.get("simulation_id") == simulation_id: matching_reports.append({ "report_id": meta.get("report_id"), "created_at": meta.get("created_at", ""), "status": meta.get("status", "") }) except Exception: continue if not matching_reports: return None # Sort by created_at descending, return the most recent matching_reports.sort(key=lambda x: x.get("created_at", ""), reverse=True) return matching_reports[0].get("report_id") except Exception as e: logger.warning(f"Failed to find report for simulation {simulation_id}: {e}") return None @simulation_bp.route('/history', methods=['GET']) def get_simulation_history(): """ Get historical simulation list (with project details) Used for the homepage history view; returns enriched simulation list including project name, description, etc. Query parameters: limit: result count limit (default 20) Returns: { "success": true, "data": [ { "simulation_id": "sim_xxxx", "project_id": "proj_xxxx", "project_name": "Public Opinion Analysis", "simulation_requirement": "If the university announces...", "status": "completed", "entities_count": 68, "profiles_count": 68, "entity_types": ["Student", "Professor", ...], "created_at": "2024-12-10", "updated_at": "2024-12-10", "total_rounds": 120, "current_round": 120, "report_id": "report_xxxx", "version": "v1.0.2" }, ... ], "count": 7 } """ try: limit = request.args.get('limit', 20, type=int) manager = SimulationManager() simulations = manager.list_simulations()[:limit] # Enrich simulation data, reading only from Simulation files enriched_simulations = [] for sim in simulations: sim_dict = sim.to_dict() # Get simulation config info (read simulation_requirement from simulation_config.json) config = manager.get_simulation_config(sim.simulation_id) if config: sim_dict["simulation_requirement"] = config.get("simulation_requirement", "") time_config = config.get("time_config", {}) sim_dict["total_simulation_hours"] = time_config.get("total_simulation_hours", 0) # Recommended rounds (fallback) recommended_rounds = int( time_config.get("total_simulation_hours", 0) * 60 / max(time_config.get("minutes_per_round", 60), 1) ) else: sim_dict["simulation_requirement"] = "" sim_dict["total_simulation_hours"] = 0 recommended_rounds = 0 # Get run state (read user-configured actual rounds from run_state.json) run_state = SimulationRunner.get_run_state(sim.simulation_id) if run_state: sim_dict["current_round"] = run_state.current_round sim_dict["runner_status"] = run_state.runner_status.value # Use user-configured total_rounds; fall back to recommended rounds if not set sim_dict["total_rounds"] = run_state.total_rounds if run_state.total_rounds > 0 else recommended_rounds else: sim_dict["current_round"] = 0 sim_dict["runner_status"] = "idle" sim_dict["total_rounds"] = recommended_rounds # Get associated project's file list (up to 3) project = ProjectManager.get_project(sim.project_id) if project and project.get("files"): sim_dict["files"] = [ {"filename": f.get("filename", "Unknown file")} for f in project.get("files", [])[:3] ] else: sim_dict["files"] = [] # Get associated report_id (find the most recent report for this simulation) sim_dict["report_id"] = _get_report_id_for_simulation(sim.simulation_id) # Add version sim_dict["version"] = "v1.0.2" # Format date try: created_date = sim_dict.get("created_at", "")[:10] sim_dict["created_date"] = created_date except: sim_dict["created_date"] = "" enriched_simulations.append(sim_dict) return jsonify({ "success": True, "data": enriched_simulations, "count": len(enriched_simulations) }) except Exception as e: logger.error(f"Failed to get simulation history: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//profiles', methods=['GET']) def get_simulation_profiles(simulation_id: str): """ Get Agent Profiles for a simulation Query parameters: platform: platform type (reddit/twitter, default reddit) """ try: platform = request.args.get('platform', 'reddit') manager = SimulationManager() profiles = manager.get_profiles(simulation_id, platform=platform) return jsonify({ "success": True, "data": { "platform": platform, "count": len(profiles), "profiles": profiles } }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 404 except Exception as e: logger.error(f"Failed to get profiles: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//profiles/realtime', methods=['GET']) def get_simulation_profiles_realtime(simulation_id: str): """ Fetch Agent Profiles in real-time (for viewing progress during generation) Differences from /profiles: - Reads files directly, bypassing SimulationManager - Suitable for real-time viewing during generation - Returns extra metadata (e.g. file modification time, whether generation is in progress) Query parameters: platform: platform type (reddit/twitter, default reddit) Returns: { "success": true, "data": { "simulation_id": "sim_xxxx", "platform": "reddit", "count": 15, "total_expected": 93, // expected total (if available) "is_generating": true, // whether generation is in progress "file_exists": true, "file_modified_at": "2025-12-04T18:20:00", "profiles": [...] } } """ import json import csv from datetime import datetime try: platform = request.args.get('platform', 'reddit') # Get simulation directory sim_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id) if not os.path.exists(sim_dir): return jsonify({ "success": False, "error": t('api.simulationNotFound', id=simulation_id) }), 404 # Determine file path if platform == "reddit": profiles_file = os.path.join(sim_dir, "reddit_profiles.json") else: profiles_file = os.path.join(sim_dir, "twitter_profiles.csv") # Check if file exists file_exists = os.path.exists(profiles_file) profiles = [] file_modified_at = None if file_exists: # Get file modification time file_stat = os.stat(profiles_file) file_modified_at = datetime.fromtimestamp(file_stat.st_mtime).isoformat() try: if platform == "reddit": with open(profiles_file, 'r', encoding='utf-8') as f: profiles = json.load(f) else: with open(profiles_file, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) profiles = list(reader) except (json.JSONDecodeError, Exception) as e: logger.warning(f"Failed to read profiles file (may be mid-write): {e}") profiles = [] # Check if generation is in progress (via state.json) is_generating = False total_expected = None state_file = os.path.join(sim_dir, "state.json") if os.path.exists(state_file): try: with open(state_file, 'r', encoding='utf-8') as f: state_data = json.load(f) status = state_data.get("status", "") is_generating = status == "preparing" total_expected = state_data.get("entities_count") except Exception: pass return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "platform": platform, "count": len(profiles), "total_expected": total_expected, "is_generating": is_generating, "file_exists": file_exists, "file_modified_at": file_modified_at, "profiles": profiles } }) except Exception as e: logger.error(f"Failed to get profiles in real-time: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//config/realtime', methods=['GET']) def get_simulation_config_realtime(simulation_id: str): """ Fetch simulation config in real-time (for viewing progress during generation) Differences from /config: - Reads files directly, bypassing SimulationManager - Suitable for real-time viewing during generation - Returns extra metadata (e.g. file modification time, whether generation is in progress) - Can return partial information even before config generation is complete Returns: { "success": true, "data": { "simulation_id": "sim_xxxx", "file_exists": true, "file_modified_at": "2025-12-04T18:20:00", "is_generating": true, // whether generation is in progress "generation_stage": "generating_config", // current generation stage "config": {...} // config content (if it exists) } } """ import json from datetime import datetime try: # Get simulation directory sim_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id) if not os.path.exists(sim_dir): return jsonify({ "success": False, "error": t('api.simulationNotFound', id=simulation_id) }), 404 # Config file path config_file = os.path.join(sim_dir, "simulation_config.json") # Check if file exists file_exists = os.path.exists(config_file) config = None file_modified_at = None if file_exists: # Get file modification time file_stat = os.stat(config_file) file_modified_at = datetime.fromtimestamp(file_stat.st_mtime).isoformat() try: with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) except (json.JSONDecodeError, Exception) as e: logger.warning(f"Failed to read config file (may be mid-write): {e}") config = None # Check if generation is in progress (via state.json) is_generating = False generation_stage = None config_generated = False state_file = os.path.join(sim_dir, "state.json") if os.path.exists(state_file): try: with open(state_file, 'r', encoding='utf-8') as f: state_data = json.load(f) status = state_data.get("status", "") is_generating = status == "preparing" config_generated = state_data.get("config_generated", False) # Determine current stage if is_generating: if state_data.get("profiles_generated", False): generation_stage = "generating_config" else: generation_stage = "generating_profiles" elif status == "ready": generation_stage = "completed" except Exception: pass # Build response data response_data = { "simulation_id": simulation_id, "file_exists": file_exists, "file_modified_at": file_modified_at, "is_generating": is_generating, "generation_stage": generation_stage, "config_generated": config_generated, "config": config } # If config exists, extract key summary stats if config: response_data["summary"] = { "total_agents": len(config.get("agent_configs", [])), "simulation_hours": config.get("time_config", {}).get("total_simulation_hours"), "initial_posts_count": len(config.get("event_config", {}).get("initial_posts", [])), "hot_topics_count": len(config.get("event_config", {}).get("hot_topics", [])), "has_twitter_config": "twitter_config" in config, "has_reddit_config": "reddit_config" in config, "generated_at": config.get("generated_at"), "llm_model": config.get("llm_model") } return jsonify({ "success": True, "data": response_data }) except Exception as e: logger.error(f"Failed to get config in real-time: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//config', methods=['GET']) def get_simulation_config(simulation_id: str): """ Get simulation config (full config intelligently generated by LLM) Returns: - time_config: time configuration (simulation duration, rounds, peak/off-peak periods) - agent_configs: per-agent activity config (activity level, post frequency, stance, etc.) - event_config: event configuration (initial posts, trending topics) - platform_configs: platform configuration - generation_reasoning: LLM's reasoning for the configuration """ try: manager = SimulationManager() config = manager.get_simulation_config(simulation_id) if not config: return jsonify({ "success": False, "error": t('api.configNotFound') }), 404 return jsonify({ "success": True, "data": config }) except Exception as e: logger.error(f"Failed to get config: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//config/download', methods=['GET']) def download_simulation_config(simulation_id: str): """Download simulation config file""" try: manager = SimulationManager() sim_dir = manager._get_simulation_dir(simulation_id) config_path = os.path.join(sim_dir, "simulation_config.json") if not os.path.exists(config_path): return jsonify({ "success": False, "error": t('api.configFileNotFound') }), 404 return send_file( config_path, as_attachment=True, download_name="simulation_config.json" ) except Exception as e: logger.error(f"Failed to download config: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/script//download', methods=['GET']) def download_simulation_script(script_name: str): """ Download a simulation run script (generic scripts in backend/scripts/) Valid script_name values: - run_twitter_simulation.py - run_reddit_simulation.py - run_parallel_simulation.py - action_logger.py """ try: # Scripts are in the backend/scripts/ directory scripts_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../scripts')) # Validate script name allowed_scripts = [ "run_twitter_simulation.py", "run_reddit_simulation.py", "run_parallel_simulation.py", "action_logger.py" ] if script_name not in allowed_scripts: return jsonify({ "success": False, "error": t('api.unknownScript', name=script_name, allowed=allowed_scripts) }), 400 script_path = os.path.join(scripts_dir, script_name) if not os.path.exists(script_path): return jsonify({ "success": False, "error": t('api.scriptFileNotFound', name=script_name) }), 404 return send_file( script_path, as_attachment=True, download_name=script_name ) except Exception as e: logger.error(f"Failed to download script: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== Profile generation endpoint (standalone use) ============== @simulation_bp.route('/generate-profiles', methods=['POST']) def generate_profiles(): """ Generate OASIS Agent Profiles directly from the graph (without creating a simulation) Request (JSON): { "graph_id": "mirofish_xxxx", // required "entity_types": ["Student"], // optional "use_llm": true, // optional "platform": "reddit" // optional } """ try: data = request.get_json() or {} graph_id = data.get('graph_id') if not graph_id: return jsonify({ "success": False, "error": t('api.requireGraphId') }), 400 entity_types = data.get('entity_types') use_llm = data.get('use_llm', True) platform = data.get('platform', 'reddit') reader = ZepEntityReader() filtered = reader.filter_defined_entities( graph_id=graph_id, defined_entity_types=entity_types, enrich_with_edges=True ) if filtered.filtered_count == 0: return jsonify({ "success": False, "error": t('api.noMatchingEntities') }), 400 generator = OasisProfileGenerator(graph_id=graph_id) profiles = generator.generate_profiles_from_entities( entities=filtered.entities, use_llm=use_llm ) if platform == "reddit": profiles_data = [p.to_reddit_format() for p in profiles] elif platform == "twitter": profiles_data = [p.to_twitter_format() for p in profiles] else: profiles_data = [p.to_dict() for p in profiles] return jsonify({ "success": True, "data": { "platform": platform, "entity_types": list(filtered.entity_types), "count": len(profiles_data), "profiles": profiles_data } }) except Exception as e: logger.error(f"Failed to generate profiles: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== Simulation run control endpoints ============== @simulation_bp.route('/start', methods=['POST']) def start_simulation(): """ Start running a simulation Request (JSON): { "simulation_id": "sim_xxxx", // required, simulation ID "platform": "parallel", // optional: twitter / reddit / parallel (default) "max_rounds": 100, // optional: max simulation rounds, to cap long simulations "enable_graph_memory_update": false, // optional: whether to dynamically update Agent activities to Zep graph memory "force": false // optional: force restart (stops running simulation and clears logs) } About force parameter: - When enabled, if the simulation is running or already complete, it is stopped and run logs are cleared - Cleared items: run_state.json, actions.jsonl, simulation.log, etc. - Config files (simulation_config.json) and profile files are NOT cleared - Use when you need to re-run a simulation About enable_graph_memory_update: - When enabled, all Agent activities (posts, comments, likes, etc.) are updated to Zep graph in real time - This lets the graph "remember" the simulation process for later analysis or AI chat - Requires the simulation's associated project to have a valid graph_id - Uses a batch update mechanism to reduce API call count Returns: { "success": true, "data": { "simulation_id": "sim_xxxx", "runner_status": "running", "process_pid": 12345, "twitter_running": true, "reddit_running": true, "started_at": "2025-12-01T10:00:00", "graph_memory_update_enabled": true, // whether graph memory update is enabled "force_restarted": true // whether this is a forced restart } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') if not simulation_id: return jsonify({ "success": False, "error": t('api.requireSimulationId') }), 400 platform = data.get('platform', 'parallel') max_rounds = data.get('max_rounds') # optional: max simulation rounds enable_graph_memory_update = data.get('enable_graph_memory_update', False) # optional: enable graph memory update force = data.get('force', False) # optional: force restart # Validate max_rounds parameter if max_rounds is not None: try: max_rounds = int(max_rounds) if max_rounds <= 0: return jsonify({ "success": False, "error": t('api.maxRoundsPositive') }), 400 except (ValueError, TypeError): return jsonify({ "success": False, "error": t('api.maxRoundsInvalid') }), 400 if platform not in ['twitter', 'reddit', 'parallel']: return jsonify({ "success": False, "error": t('api.invalidPlatform', platform=platform) }), 400 # Check if simulation is ready manager = SimulationManager() state = manager.get_simulation(simulation_id) if not state: return jsonify({ "success": False, "error": t('api.simulationNotFound', id=simulation_id) }), 404 force_restarted = False # Smart status handling: allow restart if preparation is complete if state.status != SimulationStatus.READY: # Check whether preparation has been completed is_prepared, prepare_info = _check_simulation_prepared(simulation_id) if is_prepared: # Preparation is complete; check if a process is still running if state.status == SimulationStatus.RUNNING: # Check whether the simulation process is actually running run_state = SimulationRunner.get_run_state(simulation_id) if run_state and run_state.runner_status.value == "running": # Process is indeed running if force: # Force mode: stop the running simulation logger.info(f"Force mode: stopping running simulation {simulation_id}") try: SimulationRunner.stop_simulation(simulation_id) except Exception as e: logger.warning(f"Warning while stopping simulation: {str(e)}") else: return jsonify({ "success": False, "error": t('api.simRunningForceHint') }), 400 # If in force mode, clear run logs if force: logger.info(f"Force mode: clearing simulation logs for {simulation_id}") cleanup_result = SimulationRunner.cleanup_simulation_logs(simulation_id) if not cleanup_result.get("success"): logger.warning(f"Warning while clearing logs: {cleanup_result.get('errors')}") force_restarted = True # Process does not exist or has ended; reset status to ready logger.info(f"Simulation {simulation_id} preparation complete; resetting status to ready (was: {state.status.value})") state.status = SimulationStatus.READY manager._save_simulation_state(state) else: # Preparation not yet complete return jsonify({ "success": False, "error": t('api.simNotReady', status=state.status.value) }), 400 # Get graph ID (for graph memory update) graph_id = None if enable_graph_memory_update: # Get graph_id from simulation state or project graph_id = state.graph_id if not graph_id: # Try to get from project project = ProjectManager.get_project(state.project_id) if project: graph_id = project.get("graph_id") if not graph_id: return jsonify({ "success": False, "error": t('api.graphIdRequiredForMemory') }), 400 logger.info(f"Graph memory update enabled: simulation_id={simulation_id}, graph_id={graph_id}") # Clone graph for per-simulation isolation graph_id_simulation = None if enable_graph_memory_update and graph_id: try: graph_id_sim = f"mirofish_{simulation_id}_sim" from ..graph import get_graph_backend graph_backend = get_graph_backend() if hasattr(graph_backend, 'clone_graph_sync'): graph_backend.clone_graph_sync(graph_id, graph_id_sim) elif hasattr(graph_backend, 'clone_graph'): from ..graph.graphiti_backend import _run_async _run_async(graph_backend.clone_graph(graph_id, graph_id_sim)) state.graph_id_simulation = graph_id_sim manager._save_simulation_state(state) graph_id_simulation = graph_id_sim logger.info(f"Graph cloned for simulation isolation: {graph_id_sim}") except Exception as e: logger.warning(f"Graph cloning failed, simulation uses shared graph: {e}") # Start simulation — use cloned graph if available, else fall back to original run_state = SimulationRunner.start_simulation( simulation_id=simulation_id, platform=platform, max_rounds=max_rounds, enable_graph_memory_update=enable_graph_memory_update, graph_id=graph_id_simulation or graph_id ) # Update simulation status state.status = SimulationStatus.RUNNING manager._save_simulation_state(state) response_data = run_state.to_dict() if max_rounds: response_data['max_rounds_applied'] = max_rounds response_data['graph_memory_update_enabled'] = enable_graph_memory_update response_data['force_restarted'] = force_restarted if enable_graph_memory_update: response_data['graph_id'] = graph_id if graph_id_simulation: response_data['graph_id_simulation'] = graph_id_simulation return jsonify({ "success": True, "data": response_data }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except Exception as e: logger.error(f"Failed to start simulation: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/stop', methods=['POST']) def stop_simulation(): """ Stop the simulation Request (JSON): { "simulation_id": "sim_xxxx" // required, simulation ID } Returns: { "success": true, "data": { "simulation_id": "sim_xxxx", "runner_status": "stopped", "completed_at": "2025-12-01T12:00:00" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') if not simulation_id: return jsonify({ "success": False, "error": t('api.requireSimulationId') }), 400 run_state = SimulationRunner.stop_simulation(simulation_id) # Update simulation status manager = SimulationManager() state = manager.get_simulation(simulation_id) if state: state.status = SimulationStatus.PAUSED manager._save_simulation_state(state) return jsonify({ "success": True, "data": run_state.to_dict() }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except Exception as e: logger.error(f"Failed to stop simulation: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== Real-time status monitoring endpoints ============== @simulation_bp.route('//run-status', methods=['GET']) def get_run_status(simulation_id: str): """ Get real-time simulation run status (for frontend polling) Returns: { "success": true, "data": { "simulation_id": "sim_xxxx", "runner_status": "running", "current_round": 5, "total_rounds": 144, "progress_percent": 3.5, "simulated_hours": 2, "total_simulation_hours": 72, "twitter_running": true, "reddit_running": true, "twitter_actions_count": 150, "reddit_actions_count": 200, "total_actions_count": 350, "started_at": "2025-12-01T10:00:00", "updated_at": "2025-12-01T10:30:00" } } """ try: run_state = SimulationRunner.get_run_state(simulation_id) if not run_state: return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "runner_status": "idle", "current_round": 0, "total_rounds": 0, "progress_percent": 0, "twitter_actions_count": 0, "reddit_actions_count": 0, "total_actions_count": 0, } }) return jsonify({ "success": True, "data": run_state.to_dict() }) except Exception as e: logger.error(f"Failed to get run status: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//run-status/detail', methods=['GET']) def get_run_status_detail(simulation_id: str): """ Get detailed simulation run status (including all actions) Used for real-time activity display in the frontend. Query parameters: platform: filter by platform (twitter/reddit, optional) Returns: { "success": true, "data": { "simulation_id": "sim_xxxx", "runner_status": "running", "current_round": 5, ... "all_actions": [ { "round_num": 5, "timestamp": "2025-12-01T10:30:00", "platform": "twitter", "agent_id": 3, "agent_name": "Agent Name", "action_type": "CREATE_POST", "action_args": {"content": "..."}, "result": null, "success": true }, ... ], "twitter_actions": [...], # All actions on the Twitter platform "reddit_actions": [...] # All actions on the Reddit platform } } """ try: run_state = SimulationRunner.get_run_state(simulation_id) platform_filter = request.args.get('platform') if not run_state: return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "runner_status": "idle", "all_actions": [], "twitter_actions": [], "reddit_actions": [] } }) # Get full action list all_actions = SimulationRunner.get_all_actions( simulation_id=simulation_id, platform=platform_filter ) # Get actions per platform twitter_actions = SimulationRunner.get_all_actions( simulation_id=simulation_id, platform="twitter" ) if not platform_filter or platform_filter == "twitter" else [] reddit_actions = SimulationRunner.get_all_actions( simulation_id=simulation_id, platform="reddit" ) if not platform_filter or platform_filter == "reddit" else [] # Get actions for the current round (recent_actions shows only the latest round) current_round = run_state.current_round recent_actions = SimulationRunner.get_all_actions( simulation_id=simulation_id, platform=platform_filter, round_num=current_round ) if current_round > 0 else [] # Get basic status info result = run_state.to_dict() result["all_actions"] = [a.to_dict() for a in all_actions] result["twitter_actions"] = [a.to_dict() for a in twitter_actions] result["reddit_actions"] = [a.to_dict() for a in reddit_actions] result["rounds_count"] = len(run_state.rounds) # recent_actions shows only the current latest round across both platforms result["recent_actions"] = [a.to_dict() for a in recent_actions] return jsonify({ "success": True, "data": result }) except Exception as e: logger.error(f"Failed to get detailed status: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//actions', methods=['GET']) def get_simulation_actions(simulation_id: str): """ Get Agent action history for a simulation Query parameters: limit: result count (default 100) offset: offset (default 0) platform: filter by platform (twitter/reddit) agent_id: filter by Agent ID round_num: filter by round number Returns: { "success": true, "data": { "count": 100, "actions": [...] } } """ try: limit = request.args.get('limit', 100, type=int) offset = request.args.get('offset', 0, type=int) platform = request.args.get('platform') agent_id = request.args.get('agent_id', type=int) round_num = request.args.get('round_num', type=int) actions = SimulationRunner.get_actions( simulation_id=simulation_id, limit=limit, offset=offset, platform=platform, agent_id=agent_id, round_num=round_num ) return jsonify({ "success": True, "data": { "count": len(actions), "actions": [a.to_dict() for a in actions] } }) except Exception as e: logger.error(f"Failed to get action history: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//timeline', methods=['GET']) def get_simulation_timeline(simulation_id: str): """ Get simulation timeline (summarized by round) Used for progress bar and timeline view in the frontend. Query parameters: start_round: starting round (default 0) end_round: ending round (default all) Returns summary info for each round. """ try: start_round = request.args.get('start_round', 0, type=int) end_round = request.args.get('end_round', type=int) timeline = SimulationRunner.get_timeline( simulation_id=simulation_id, start_round=start_round, end_round=end_round ) return jsonify({ "success": True, "data": { "rounds_count": len(timeline), "timeline": timeline } }) except Exception as e: logger.error(f"Failed to get timeline: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//agent-stats', methods=['GET']) def get_agent_stats(simulation_id: str): """ Get statistics for each Agent Used to display agent activity rankings and action distribution in the frontend. """ try: stats = SimulationRunner.get_agent_stats(simulation_id) return jsonify({ "success": True, "data": { "agents_count": len(stats), "stats": stats } }) except Exception as e: logger.error(f"Failed to get Agent stats: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== Database query endpoints ============== @simulation_bp.route('//posts', methods=['GET']) def get_simulation_posts(simulation_id: str): """ Get posts from a simulation Query parameters: platform: platform type (twitter/reddit) limit: result count (default 50) offset: offset Returns list of posts (read from SQLite database) """ try: platform = request.args.get('platform', 'reddit') limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) sim_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id) db_file = f"{platform}_simulation.db" db_path = os.path.join(sim_dir, db_file) if not os.path.exists(db_path): return jsonify({ "success": True, "data": { "platform": platform, "count": 0, "posts": [], "message": t('api.dbNotExist') } }) import sqlite3 conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() try: cursor.execute(""" SELECT * FROM post ORDER BY created_at DESC LIMIT ? OFFSET ? """, (limit, offset)) posts = [dict(row) for row in cursor.fetchall()] cursor.execute("SELECT COUNT(*) FROM post") total = cursor.fetchone()[0] except sqlite3.OperationalError: posts = [] total = 0 conn.close() return jsonify({ "success": True, "data": { "platform": platform, "total": total, "count": len(posts), "posts": posts } }) except Exception as e: logger.error(f"Failed to get posts: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//comments', methods=['GET']) def get_simulation_comments(simulation_id: str): """ Get comments from a simulation (Reddit only) Query parameters: post_id: filter by post ID (optional) limit: result count offset: offset """ try: post_id = request.args.get('post_id') limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) sim_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id) db_path = os.path.join(sim_dir, "reddit_simulation.db") if not os.path.exists(db_path): return jsonify({ "success": True, "data": { "count": 0, "comments": [] } }) import sqlite3 conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() try: if post_id: cursor.execute(""" SELECT * FROM comment WHERE post_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ? """, (post_id, limit, offset)) else: cursor.execute(""" SELECT * FROM comment ORDER BY created_at DESC LIMIT ? OFFSET ? """, (limit, offset)) comments = [dict(row) for row in cursor.fetchall()] except sqlite3.OperationalError: comments = [] conn.close() return jsonify({ "success": True, "data": { "count": len(comments), "comments": comments } }) except Exception as e: logger.error(f"Failed to get comments: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== Interview endpoints ============== @simulation_bp.route('/interview', methods=['POST']) def interview_agent(): """ Interview a single Agent Note: this feature requires the simulation environment to be running (after the simulation loop completes it enters command-waiting mode). Request (JSON): { "simulation_id": "sim_xxxx", // required, simulation ID "agent_id": 0, // required, Agent ID "prompt": "What do you think?", // required, interview question "platform": "twitter", // optional, specify platform (twitter/reddit) // if not specified: dual-platform simulation interviews both "timeout": 60 // optional, timeout in seconds, default 60 } Returns (no platform specified, dual-platform mode): { "success": true, "data": { "agent_id": 0, "prompt": "What do you think?", "result": { "agent_id": 0, "prompt": "...", "platforms": { "twitter": {"agent_id": 0, "response": "...", "platform": "twitter"}, "reddit": {"agent_id": 0, "response": "...", "platform": "reddit"} } }, "timestamp": "2025-12-08T10:00:01" } } Returns (platform specified): { "success": true, "data": { "agent_id": 0, "prompt": "What do you think?", "result": { "agent_id": 0, "response": "I think...", "platform": "twitter", "timestamp": "2025-12-08T10:00:00" }, "timestamp": "2025-12-08T10:00:01" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') agent_id = data.get('agent_id') prompt = data.get('prompt') platform = data.get('platform') # optional: twitter/reddit/None timeout = data.get('timeout', 60) if not simulation_id: return jsonify({ "success": False, "error": t('api.requireSimulationId') }), 400 if agent_id is None: return jsonify({ "success": False, "error": t('api.requireAgentId') }), 400 if not prompt: return jsonify({ "success": False, "error": t('api.requirePrompt') }), 400 # Validate platform parameter if platform and platform not in ("twitter", "reddit"): return jsonify({ "success": False, "error": t('api.invalidInterviewPlatform') }), 400 # Check environment status if not SimulationRunner.check_env_alive(simulation_id): return jsonify({ "success": False, "error": t('api.envNotRunning') }), 400 # Optimize prompt: add prefix to prevent Agent from calling tools optimized_prompt = optimize_interview_prompt(prompt) result = SimulationRunner.interview_agent( simulation_id=simulation_id, agent_id=agent_id, prompt=optimized_prompt, platform=platform, timeout=timeout ) return jsonify({ "success": result.get("success", False), "data": result }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except TimeoutError as e: return jsonify({ "success": False, "error": t('api.interviewTimeout', error=str(e)) }), 504 except Exception as e: logger.error(f"Interview failed: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/interview/batch', methods=['POST']) def interview_agents_batch(): """ Batch interview multiple Agents Note: this feature requires the simulation environment to be running. Request (JSON): { "simulation_id": "sim_xxxx", // required, simulation ID "interviews": [ // required, interview list { "agent_id": 0, "prompt": "What do you think of A?", "platform": "twitter" // optional, specify platform for this agent }, { "agent_id": 1, "prompt": "What do you think of B?" // no platform: uses default } ], "platform": "reddit", // optional, default platform (overridden per item) // if not specified: dual-platform each agent is interviewed on both "timeout": 120 // optional, timeout in seconds, default 120 } Returns: { "success": true, "data": { "interviews_count": 2, "result": { "interviews_count": 4, "results": { "twitter_0": {"agent_id": 0, "response": "...", "platform": "twitter"}, "reddit_0": {"agent_id": 0, "response": "...", "platform": "reddit"}, "twitter_1": {"agent_id": 1, "response": "...", "platform": "twitter"}, "reddit_1": {"agent_id": 1, "response": "...", "platform": "reddit"} } }, "timestamp": "2025-12-08T10:00:01" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') interviews = data.get('interviews') platform = data.get('platform') # optional: twitter/reddit/None timeout = data.get('timeout', 120) if not simulation_id: return jsonify({ "success": False, "error": t('api.requireSimulationId') }), 400 if not interviews or not isinstance(interviews, list): return jsonify({ "success": False, "error": t('api.requireInterviews') }), 400 # Validate platform parameter if platform and platform not in ("twitter", "reddit"): return jsonify({ "success": False, "error": t('api.invalidInterviewPlatform') }), 400 # Validate each interview item for i, interview in enumerate(interviews): if 'agent_id' not in interview: return jsonify({ "success": False, "error": t('api.interviewListMissingAgentId', index=i+1) }), 400 if 'prompt' not in interview: return jsonify({ "success": False, "error": t('api.interviewListMissingPrompt', index=i+1) }), 400 # Validate per-item platform (if present) item_platform = interview.get('platform') if item_platform and item_platform not in ("twitter", "reddit"): return jsonify({ "success": False, "error": t('api.interviewListInvalidPlatform', index=i+1) }), 400 # Check environment status if not SimulationRunner.check_env_alive(simulation_id): return jsonify({ "success": False, "error": t('api.envNotRunning') }), 400 # Optimize prompt for each interview item: add prefix to prevent tool calls optimized_interviews = [] for interview in interviews: optimized_interview = interview.copy() optimized_interview['prompt'] = optimize_interview_prompt(interview.get('prompt', '')) optimized_interviews.append(optimized_interview) result = SimulationRunner.interview_agents_batch( simulation_id=simulation_id, interviews=optimized_interviews, platform=platform, timeout=timeout ) return jsonify({ "success": result.get("success", False), "data": result }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except TimeoutError as e: return jsonify({ "success": False, "error": t('api.batchInterviewTimeout', error=str(e)) }), 504 except Exception as e: logger.error(f"Batch interview failed: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/interview/all', methods=['POST']) def interview_all_agents(): """ Global interview - ask all Agents the same question Note: this feature requires the simulation environment to be running. Request (JSON): { "simulation_id": "sim_xxxx", // required, simulation ID "prompt": "What is your overall take on this?", // required, interview question (same for all agents) "platform": "reddit", // optional, specify platform (twitter/reddit) // if not specified: dual-platform, each agent interviewed on both "timeout": 180 // optional, timeout in seconds, default 180 } Returns: { "success": true, "data": { "interviews_count": 50, "result": { "interviews_count": 100, "results": { "twitter_0": {"agent_id": 0, "response": "...", "platform": "twitter"}, "reddit_0": {"agent_id": 0, "response": "...", "platform": "reddit"}, ... } }, "timestamp": "2025-12-08T10:00:01" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') prompt = data.get('prompt') platform = data.get('platform') # optional: twitter/reddit/None timeout = data.get('timeout', 180) if not simulation_id: return jsonify({ "success": False, "error": t('api.requireSimulationId') }), 400 if not prompt: return jsonify({ "success": False, "error": t('api.requirePrompt') }), 400 # Validate platform parameter if platform and platform not in ("twitter", "reddit"): return jsonify({ "success": False, "error": t('api.invalidInterviewPlatform') }), 400 # Check environment status if not SimulationRunner.check_env_alive(simulation_id): return jsonify({ "success": False, "error": t('api.envNotRunning') }), 400 # Optimize prompt: add prefix to prevent tool calls optimized_prompt = optimize_interview_prompt(prompt) result = SimulationRunner.interview_all_agents( simulation_id=simulation_id, prompt=optimized_prompt, platform=platform, timeout=timeout ) return jsonify({ "success": result.get("success", False), "data": result }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except TimeoutError as e: return jsonify({ "success": False, "error": t('api.globalInterviewTimeout', error=str(e)) }), 504 except Exception as e: logger.error(f"Global interview failed: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/interview/history', methods=['POST']) def get_interview_history(): """ Get Interview history records Reads all Interview records from the simulation database. Request (JSON): { "simulation_id": "sim_xxxx", // required, simulation ID "platform": "reddit", // optional, platform type (reddit/twitter) // if not specified, returns history from both platforms "agent_id": 0, // optional, get only this agent's interview history "limit": 100 // optional, result count, default 100 } Returns: { "success": true, "data": { "count": 10, "history": [ { "agent_id": 0, "response": "I think...", "prompt": "What do you think about this?", "timestamp": "2025-12-08T10:00:00", "platform": "reddit" }, ... ] } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') platform = data.get('platform') # if not specified, return history from both platforms agent_id = data.get('agent_id') limit = data.get('limit', 100) if not simulation_id: return jsonify({ "success": False, "error": t('api.requireSimulationId') }), 400 history = SimulationRunner.get_interview_history( simulation_id=simulation_id, platform=platform, agent_id=agent_id, limit=limit ) return jsonify({ "success": True, "data": { "count": len(history), "history": history } }) except Exception as e: logger.error(f"Failed to get interview history: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/env-status', methods=['POST']) def get_env_status(): """ Get simulation environment status Checks whether the simulation environment is alive (can receive Interview commands). Request (JSON): { "simulation_id": "sim_xxxx" // required, simulation ID } Returns: { "success": true, "data": { "simulation_id": "sim_xxxx", "env_alive": true, "twitter_available": true, "reddit_available": true, "message": "Environment is running and ready to receive Interview commands" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') if not simulation_id: return jsonify({ "success": False, "error": t('api.requireSimulationId') }), 400 env_alive = SimulationRunner.check_env_alive(simulation_id) # Get more detailed status info env_status = SimulationRunner.get_env_status_detail(simulation_id) if env_alive: message = t('api.envRunning') else: message = t('api.envNotRunningShort') return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "env_alive": env_alive, "twitter_available": env_status.get("twitter_available", False), "reddit_available": env_status.get("reddit_available", False), "message": message } }) except Exception as e: logger.error(f"Failed to get environment status: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/close-env', methods=['POST']) def close_simulation_env(): """ Close the simulation environment Sends a close-environment command to the simulation, causing it to exit command-waiting mode gracefully. Note: this is different from /stop, which forcibly terminates the process. This endpoint lets the simulation gracefully close the environment and exit. Request (JSON): { "simulation_id": "sim_xxxx", // required, simulation ID "timeout": 30 // optional, timeout in seconds, default 30 } Returns: { "success": true, "data": { "message": "Environment close command sent", "result": {...}, "timestamp": "2025-12-08T10:00:01" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') timeout = data.get('timeout', 30) if not simulation_id: return jsonify({ "success": False, "error": t('api.requireSimulationId') }), 400 result = SimulationRunner.close_simulation_env( simulation_id=simulation_id, timeout=timeout ) # Update simulation status manager = SimulationManager() state = manager.get_simulation(simulation_id) if state: state.status = SimulationStatus.COMPLETED manager._save_simulation_state(state) return jsonify({ "success": result.get("success", False), "data": result }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except Exception as e: logger.error(f"Failed to close environment: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== F2-A: Agent CRUD endpoints ============== @simulation_bp.route('//agent/', methods=['PATCH']) def patch_agent(simulation_id: str, user_id: int): """Update an agent profile (Fase A/B fields). Sets manually_edited=True.""" try: fields = request.get_json() or {} if not fields: return jsonify({"success": False, "error": t('api.requireFields')}), 400 manager = SimulationManager() try: updated = manager.patch_agent_profile(simulation_id, user_id, fields) except ValueError as e: return jsonify({"success": False, "error": str(e)}), 404 except PermissionError as e: return jsonify({"success": False, "error": str(e)}), 403 except LookupError as e: return jsonify({"success": False, "error": str(e)}), 404 except FileNotFoundError as e: return jsonify({"success": False, "error": str(e)}), 404 return jsonify({"success": True, "data": updated}) except Exception as e: logger.error(f"patch_agent failed: {e}") return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500 @simulation_bp.route('//agent/', methods=['DELETE']) def delete_agent(simulation_id: str, user_id: int): """Remove an agent from the simulation (Fase A only).""" try: manager = SimulationManager() try: manager.delete_agent_profile(simulation_id, user_id) except ValueError as e: return jsonify({"success": False, "error": str(e)}), 404 except PermissionError as e: return jsonify({"success": False, "error": str(e)}), 403 except LookupError as e: return jsonify({"success": False, "error": str(e)}), 404 except FileNotFoundError as e: return jsonify({"success": False, "error": str(e)}), 404 return jsonify({"success": True, "data": {"deleted_user_id": user_id}}) except Exception as e: logger.error(f"delete_agent failed: {e}") return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500 @simulation_bp.route('//generate-config', methods=['POST']) def generate_config_endpoint(simulation_id: str): """ Transition from Fase A to Fase B. Requires status=profiles_ready. Changes to configuring, starts async config generation. Returns task_id for polling. """ import threading from ..models.task import TaskManager, TaskStatus from ..services.simulation_config_generator import SimulationConfigGenerator try: manager = SimulationManager() state = manager.get_simulation(simulation_id) if not state: return jsonify({"success": False, "error": t('api.simulationNotFound', id=simulation_id)}), 404 if state.status != SimulationStatus.PROFILES_READY: return jsonify({ "success": False, "error": t('api.requireProfilesReady', status=state.status.value) }), 400 project = ProjectManager.get_project(state.project_id) if not project: return jsonify({"success": False, "error": t('api.projectNotFound', id=state.project_id)}), 404 simulation_requirement = project.get("simulation_requirement") or "" document_text = ProjectManager.get_extracted_text(state.project_id, get_storage()) or "" task_manager = TaskManager() task_id = task_manager.create_task( task_type="generate_config", metadata={"simulation_id": simulation_id} ) state.status = SimulationStatus.CONFIGURING manager._save_simulation_state(state) current_locale = get_locale() def run_generate_config(): import json as _json set_locale(current_locale) try: task_manager.update_task(task_id, status=TaskStatus.PROCESSING, progress=0, message=t('progress.generatingSimConfig')) sim_dir = manager._get_simulation_dir(simulation_id) profiles_file = os.path.join(sim_dir, "reddit_profiles.json") with open(profiles_file, 'r', encoding='utf-8') as f: profiles = _json.load(f) from ..services.zep_entity_reader import ZepEntityReader, EntityNode entity_nodes = [] reader = ZepEntityReader() for p in profiles: uuid_ = p.get("source_entity_uuid") if uuid_: try: entity = reader.get_entity_with_context(state.graph_id, uuid_) if entity: entity_nodes.append(entity) except Exception: pass # If Zep lookup yielded nothing, synthesize minimal EntityNodes from profiles # so the config generator can still produce per-agent configs. if not entity_nodes: for p in profiles: user_id = p.get("user_id", 0) entity_nodes.append(EntityNode( uuid=p.get("source_entity_uuid") or f"profile_{user_id}", name=p.get("name") or p.get("username") or f"agent_{user_id}", labels=[p.get("profession") or "Person"], summary=p.get("bio") or "", attributes={ "age": p.get("age"), "gender": p.get("gender"), "country": p.get("country"), "profession": p.get("profession"), "mbti": p.get("mbti"), "stance": p.get("stance"), }, )) gen = SimulationConfigGenerator() params = gen.generate_config( simulation_id=simulation_id, project_id=state.project_id, graph_id=state.graph_id, simulation_requirement=simulation_requirement, document_text=document_text, entities=entity_nodes, enable_twitter=state.enable_twitter, enable_reddit=state.enable_reddit, ) config_file = os.path.join(sim_dir, "simulation_config.json") with open(config_file, 'w', encoding='utf-8') as f: f.write(params.to_json()) state2 = manager.get_simulation(simulation_id) if state2: state2.status = SimulationStatus.READY state2.config_generated = True manager._save_simulation_state(state2) task_manager.complete_task(task_id, result={"status": "prepared"}) except Exception as e: logger.error(f"generate_config background failed: {e}") task_manager.fail_task(task_id, str(e)) state2 = manager.get_simulation(simulation_id) if state2: state2.status = SimulationStatus.PROFILES_READY manager._save_simulation_state(state2) threading.Thread(target=run_generate_config, daemon=True).start() return jsonify({"success": True, "data": {"simulation_id": simulation_id, "task_id": task_id}}) except Exception as e: logger.error(f"generate_config endpoint error: {e}") return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500 @simulation_bp.route('//config', methods=['PATCH']) def patch_simulation_config_endpoint(simulation_id: str): """Update simulation global config parameters (Fase B).""" try: fields = request.get_json() or {} if not fields: return jsonify({"success": False, "error": t('api.requireFields')}), 400 manager = SimulationManager() try: updated = manager.patch_simulation_config(simulation_id, fields) except ValueError as e: return jsonify({"success": False, "error": str(e)}), 404 except PermissionError as e: return jsonify({"success": False, "error": str(e)}), 403 except FileNotFoundError as e: return jsonify({"success": False, "error": str(e)}), 404 return jsonify({"success": True, "data": updated}) except Exception as e: logger.error(f"patch_simulation_config failed: {e}") return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500 @simulation_bp.route('//clone', methods=['POST']) def clone_simulation(simulation_id: str): """Clone a simulation: copy agent profiles, set status=profiles_ready.""" try: data = request.get_json() or {} project_id = data.get('project_id') if not project_id: return jsonify({"success": False, "error": "project_id is required"}), 400 manager = SimulationManager() try: new_state = manager.clone_simulation(simulation_id, project_id) except LookupError as e: return jsonify({"success": False, "error": str(e)}), 404 except ValueError as e: return jsonify({"success": False, "error": str(e)}), 400 return jsonify({ "success": True, "data": { "new_simulation_id": new_state.simulation_id, "parent_simulation_id": simulation_id, "status": new_state.status.value, } }) except Exception as e: logger.error(f"clone_simulation failed: {e}") return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500 @simulation_bp.route('//agent', methods=['POST']) def create_agent(simulation_id: str): """ Task 8 — Create a new agent from an entity UUID. Requires status in {profiles_ready, created}. Runs async; returns task_id for polling. """ import json import threading from ..models.task import TaskManager, TaskStatus try: data = request.get_json() or {} source_entity_uuid = data.get("source_entity_uuid") if not source_entity_uuid: return jsonify({"success": False, "error": t('api.requireFields')}), 400 extra_instructions = data.get("extra_instructions") manager = SimulationManager() state = manager.get_simulation(simulation_id) if not state: return jsonify({"success": False, "error": t('api.simulationNotFound', id=simulation_id)}), 404 allowed_statuses = {SimulationStatus.PROFILES_READY, SimulationStatus.CREATED} if state.status not in allowed_statuses: return jsonify({ "success": False, "error": t('api.requireProfilesReady', status=state.status.value) }), 400 task_manager = TaskManager() task_id = task_manager.create_task( task_type="create_agent", metadata={"simulation_id": simulation_id, "source_entity_uuid": source_entity_uuid} ) current_locale = get_locale() def run_create_agent(): set_locale(current_locale) try: task_manager.update_task(task_id, status=TaskStatus.PROCESSING, progress=0, message=t('progress.generatingProfile')) sim_dir = manager._get_simulation_dir(simulation_id) profiles_file = os.path.join(sim_dir, "reddit_profiles.json") # Fetch entity reader = ZepEntityReader() entity = reader.get_entity_with_context(state.graph_id, source_entity_uuid) if not entity: raise ValueError(f"Entity not found: {source_entity_uuid}") # Read current profiles with open(profiles_file, 'r', encoding='utf-8') as f: profiles = json.load(f) # Check for duplicate source_entity_uuid for p in profiles: if p.get("source_entity_uuid") == source_entity_uuid: raise ValueError(f"Agent with source_entity_uuid '{source_entity_uuid}' already exists") # Compute next user_id next_user_id = max((p.get("user_id", -1) for p in profiles), default=-1) + 1 # Generate profile gen = OasisProfileGenerator(graph_id=state.graph_id) profile = gen.generate_profile_from_entity(entity, extra_instructions=extra_instructions) profile.user_id = next_user_id # Convert to dict and append profile_dict = profile.to_reddit_format() profile_dict["user_id"] = next_user_id profile_dict["source_entity_uuid"] = profile.source_entity_uuid profile_dict["source_entity_type"] = profile.source_entity_type profile_dict["manually_edited"] = False profiles.append(profile_dict) # Atomic write: backup → write → delete backup backup_file = profiles_file + ".bak" if os.path.exists(profiles_file): import shutil shutil.copy2(profiles_file, backup_file) try: with open(profiles_file, 'w', encoding='utf-8') as f: json.dump(profiles, f, ensure_ascii=False, indent=2) if os.path.exists(backup_file): os.remove(backup_file) except Exception as write_err: # Restore from backup on failure if os.path.exists(backup_file): import shutil shutil.copy2(backup_file, profiles_file) raise write_err # Update profiles_count in state state2 = manager.get_simulation(simulation_id) if state2: state2.profiles_count = len(profiles) manager._save_simulation_state(state2) task_manager.complete_task(task_id, result={"user_id": next_user_id}) except Exception as e: logger.error(f"create_agent background failed: {e}") task_manager.fail_task(task_id, str(e)) threading.Thread(target=run_create_agent, daemon=True).start() return jsonify({"success": True, "data": {"simulation_id": simulation_id, "task_id": task_id}}) except Exception as e: logger.error(f"create_agent endpoint error: {e}") return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500 @simulation_bp.route('//agent//regenerate', methods=['POST']) def regenerate_agent(simulation_id: str, user_id: int): """ Task 9 — Regenerate an agent's profile from its source entity. Requires status == profiles_ready and the agent must have source_entity_uuid. Runs async; returns task_id for polling. """ import json import threading from ..models.task import TaskManager, TaskStatus try: data = request.get_json() or {} extra_instructions = data.get("extra_instructions") manager = SimulationManager() state = manager.get_simulation(simulation_id) if not state: return jsonify({"success": False, "error": t('api.simulationNotFound', id=simulation_id)}), 404 if state.status != SimulationStatus.PROFILES_READY: return jsonify({ "success": False, "error": t('api.requireProfilesReady', status=state.status.value) }), 400 # Validate agent synchronously before creating the task sim_dir = manager._get_simulation_dir(simulation_id) profiles_file = os.path.join(sim_dir, "reddit_profiles.json") if not os.path.exists(profiles_file): return jsonify({"success": False, "error": t('api.simulationNotFound', id=simulation_id)}), 404 with open(profiles_file, 'r', encoding='utf-8') as f: profiles_snapshot = json.load(f) agent_entry = next((p for p in profiles_snapshot if p.get("user_id") == user_id), None) if agent_entry is None: return jsonify({"success": False, "error": t('api.agentNotFound', user_id=user_id)}), 404 # source_entity_uuid may be None for profiles generated before this field was persisted; # the background task will fall back to searching by agent name. task_manager = TaskManager() task_id = task_manager.create_task( task_type="regenerate_agent", metadata={"simulation_id": simulation_id, "user_id": user_id} ) current_locale = get_locale() def run_regenerate_agent(): set_locale(current_locale) try: task_manager.update_task(task_id, status=TaskStatus.PROCESSING, progress=0, message=t('progress.generatingProfile')) with open(profiles_file, 'r', encoding='utf-8') as f: profiles = json.load(f) # Find the agent (re-read to get latest state) agent_idx = None agent = None for i, p in enumerate(profiles): if p.get("user_id") == user_id: agent_idx = i agent = p break if agent_idx is None: raise LookupError(f"Agent with user_id {user_id} not found") source_entity_uuid = agent.get("source_entity_uuid") reader = ZepEntityReader() entity = None if source_entity_uuid: entity = reader.get_entity_with_context(state.graph_id, source_entity_uuid) # Fallback: search by agent name when UUID is missing (profiles generated before # source_entity_uuid was persisted in to_reddit_format). if not entity: agent_name = agent.get("name") or agent.get("username") or "" if agent_name: all_entities_result = reader.filter_defined_entities( graph_id=state.graph_id, defined_entity_types=None, enrich_with_edges=True ) for e in (all_entities_result.entities or []): if e.name and agent_name.lower() in e.name.lower(): entity = e break if not entity: raise ValueError(f"Entity not found for agent {user_id} (uuid={source_entity_uuid!r}, name={agent.get('name')!r})") # Generate new profile gen = OasisProfileGenerator(graph_id=state.graph_id) new_profile = gen.generate_profile_from_entity(entity, extra_instructions=extra_instructions) new_profile.user_id = user_id # Build dict, preserving user_id and resetting manually_edited new_profile_dict = new_profile.to_reddit_format() new_profile_dict["user_id"] = user_id new_profile_dict["source_entity_uuid"] = new_profile.source_entity_uuid new_profile_dict["source_entity_type"] = new_profile.source_entity_type new_profile_dict["manually_edited"] = False profiles[agent_idx] = new_profile_dict # Atomic write backup_file = profiles_file + ".bak" if os.path.exists(profiles_file): import shutil shutil.copy2(profiles_file, backup_file) try: with open(profiles_file, 'w', encoding='utf-8') as f: json.dump(profiles, f, ensure_ascii=False, indent=2) if os.path.exists(backup_file): os.remove(backup_file) except Exception as write_err: if os.path.exists(backup_file): import shutil shutil.copy2(backup_file, profiles_file) raise write_err task_manager.complete_task(task_id, result={"user_id": user_id}) except Exception as e: logger.error(f"regenerate_agent background failed: {e}") task_manager.fail_task(task_id, str(e)) threading.Thread(target=run_regenerate_agent, daemon=True).start() return jsonify({"success": True, "data": {"simulation_id": simulation_id, "task_id": task_id}}) except Exception as e: logger.error(f"regenerate_agent endpoint error: {e}") return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500 @simulation_bp.route('/task/', methods=['GET']) def get_task_status(task_id: str): """Generic task status endpoint for polling any simulation-related async task.""" from ..models.task import TaskManager try: task = TaskManager().get_task(task_id) if not task: return jsonify({"success": False, "error": t('api.taskNotFound', id=task_id)}), 404 return jsonify({"success": True, "data": task}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @simulation_bp.route('//download/report', methods=['GET']) def download_simulation_report(simulation_id: str): """Download the final report for a simulation as a Markdown file.""" from ..services.report_agent import ReportManager try: report = ReportManager.get_report_by_simulation(simulation_id) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 if not report: return jsonify({"success": False, "error": t('api.noReportForSim', id=simulation_id)}), 404 content = (report.markdown_content or "").encode("utf-8") filename = f"report_{report.report_id}.md" return Response( content, status=200, headers={ "Content-Disposition": f'attachment; filename="{filename}"', "Content-Type": "text/markdown; charset=utf-8", } ) @simulation_bp.route('//download/log', methods=['GET']) def download_simulation_log(simulation_id: str): """Download the raw simulation actions log (JSONL).""" log_path = _get_simulation_log_path(simulation_id) if not log_path: return jsonify({"success": False, "error": "Log file not found"}), 404 try: with open(log_path, "rb") as f: data = f.read() except OSError as e: return jsonify({"success": False, "error": str(e)}), 500 filename = f"simulation_{simulation_id}_log.jsonl" return Response( data, status=200, headers={ "Content-Disposition": f'attachment; filename="{filename}"', "Content-Type": "application/x-ndjson", } )