MicroFish/backend/app/api/simulation.py

3187 lines
114 KiB
Python

"""
Simulation-related API routes
Step 2: Zep entity reading & filtering, OASIS simulation preparation & execution (fully automated)
"""
import os
import traceback
from flask import request, jsonify, send_file
from . import simulation_bp
from .. import get_storage
from ..config import Config
from ..services.zep_entity_reader import ZepEntityReader
from ..services.oasis_profile_generator import OasisProfileGenerator
from ..services.simulation_manager import SimulationManager, SimulationStatus
from ..services.simulation_runner import SimulationRunner, RunnerStatus
from ..utils.logger import get_logger
from ..utils.locale import t, get_locale, set_locale
from ..models.project import ProjectManager
logger = get_logger('mirofish.api.simulation')
# Interview prompt optimization prefix
# Adding this prefix prevents the Agent from calling tools and forces a direct text reply
INTERVIEW_PROMPT_PREFIX = "Based on your persona, all past memories and actions, reply to me directly in text without calling any tools: "
def optimize_interview_prompt(prompt: str) -> str:
"""
Optimize an interview question by adding a prefix to prevent tool calls.
Args:
prompt: original question
Returns:
optimized question
"""
if not prompt:
return prompt
# Avoid adding the prefix twice
if prompt.startswith(INTERVIEW_PROMPT_PREFIX):
return prompt
return f"{INTERVIEW_PROMPT_PREFIX}{prompt}"
# ============== Entity retrieval endpoints ==============
@simulation_bp.route('/entities/<graph_id>', methods=['GET'])
def get_graph_entities(graph_id: str):
"""
Get all entities in the graph (filtered)
Returns only nodes matching predefined entity types (nodes with Labels beyond just "Entity").
Query parameters:
entity_types: comma-separated entity type list (optional, for further filtering)
enrich: whether to fetch related edge info (default true)
"""
try:
entity_types_str = request.args.get('entity_types', '')
entity_types = [t.strip() for t in entity_types_str.split(',') if t.strip()] if entity_types_str else None
enrich = request.args.get('enrich', 'true').lower() == 'true'
logger.info(f"Fetching graph entities: graph_id={graph_id}, entity_types={entity_types}, enrich={enrich}")
reader = ZepEntityReader()
result = reader.filter_defined_entities(
graph_id=graph_id,
defined_entity_types=entity_types,
enrich_with_edges=enrich
)
return jsonify({
"success": True,
"data": result.to_dict()
})
except Exception as e:
logger.error(f"Failed to get graph entities: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/entities/<graph_id>/<entity_uuid>', methods=['GET'])
def get_entity_detail(graph_id: str, entity_uuid: str):
"""Get detailed information about a single entity"""
try:
reader = ZepEntityReader()
entity = reader.get_entity_with_context(graph_id, entity_uuid)
if not entity:
return jsonify({
"success": False,
"error": t('api.entityNotFound', id=entity_uuid)
}), 404
return jsonify({
"success": True,
"data": entity.to_dict()
})
except Exception as e:
logger.error(f"Failed to get entity details: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/entities/<graph_id>/by-type/<entity_type>', methods=['GET'])
def get_entities_by_type(graph_id: str, entity_type: str):
"""Get all entities of a specified type"""
try:
enrich = request.args.get('enrich', 'true').lower() == 'true'
reader = ZepEntityReader()
entities = reader.get_entities_by_type(
graph_id=graph_id,
entity_type=entity_type,
enrich_with_edges=enrich
)
return jsonify({
"success": True,
"data": {
"entity_type": entity_type,
"count": len(entities),
"entities": [e.to_dict() for e in entities]
}
})
except Exception as e:
logger.error(f"Failed to get entities: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Simulation management endpoints ==============
@simulation_bp.route('/create', methods=['POST'])
def create_simulation():
"""
Create a new simulation
Note: parameters like max_rounds are intelligently generated by the LLM; no manual setup needed.
Request (JSON):
{
"project_id": "proj_xxxx", // required
"graph_id": "mirofish_xxxx", // optional, falls back to project graph_id
"enable_twitter": true, // optional, default true
"enable_reddit": true // optional, default true
}
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"project_id": "proj_xxxx",
"graph_id": "mirofish_xxxx",
"status": "created",
"enable_twitter": true,
"enable_reddit": true,
"created_at": "2025-12-01T10:00:00"
}
}
"""
try:
data = request.get_json() or {}
project_id = data.get('project_id')
if not project_id:
return jsonify({
"success": False,
"error": t('api.requireProjectId')
}), 400
project = ProjectManager.get_project(project_id)
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=project_id)
}), 404
graph_id = data.get('graph_id') or project.get("graph_id")
if not graph_id:
return jsonify({
"success": False,
"error": t('api.graphNotBuilt')
}), 400
manager = SimulationManager()
state = manager.create_simulation(
project_id=project_id,
graph_id=graph_id,
enable_twitter=data.get('enable_twitter', True),
enable_reddit=data.get('enable_reddit', True),
)
return jsonify({
"success": True,
"data": state.to_dict()
})
except Exception as e:
logger.error(f"Failed to create simulation: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
def _check_simulation_prepared(simulation_id: str) -> tuple:
"""
Check whether the simulation preparation is complete.
Checks:
1. state.json exists and status is "ready"
2. Required files exist: reddit_profiles.json, twitter_profiles.csv, simulation_config.json
Note: run scripts (run_*.py) stay in backend/scripts/ and are no longer copied to the simulation directory.
Args:
simulation_id: simulation ID
Returns:
(is_prepared: bool, info: dict)
"""
import os
from ..config import Config
simulation_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id)
# Check if directory exists
if not os.path.exists(simulation_dir):
return False, {"reason": "Simulation directory does not exist"}
# Required file list (scripts excluded; scripts are in backend/scripts/)
required_files = [
"state.json",
"simulation_config.json",
"reddit_profiles.json",
"twitter_profiles.csv"
]
# Check if files exist
existing_files = []
missing_files = []
for f in required_files:
file_path = os.path.join(simulation_dir, f)
if os.path.exists(file_path):
existing_files.append(f)
else:
missing_files.append(f)
if missing_files:
return False, {
"reason": "Missing required files",
"missing_files": missing_files,
"existing_files": existing_files
}
# Check status in state.json
state_file = os.path.join(simulation_dir, "state.json")
try:
import json
with open(state_file, 'r', encoding='utf-8') as f:
state_data = json.load(f)
status = state_data.get("status", "")
config_generated = state_data.get("config_generated", False)
# Detailed log
logger.debug(f"Checking simulation preparation status: {simulation_id}, status={status}, config_generated={config_generated}")
# If config_generated=True and files exist, treat preparation as complete.
# All of the following statuses indicate preparation has finished:
# - ready: preparation complete, can run
# - preparing: if config_generated=True, preparation is done
# - running: currently running, meaning preparation completed long ago
# - completed: run finished, meaning preparation completed long ago
# - stopped: stopped, meaning preparation completed long ago
# - failed: run failed (but preparation was complete)
prepared_statuses = ["ready", "preparing", "running", "completed", "stopped", "failed"]
if status in prepared_statuses and config_generated:
# Get file statistics
profiles_file = os.path.join(simulation_dir, "reddit_profiles.json")
config_file = os.path.join(simulation_dir, "simulation_config.json")
profiles_count = 0
if os.path.exists(profiles_file):
with open(profiles_file, 'r', encoding='utf-8') as f:
profiles_data = json.load(f)
profiles_count = len(profiles_data) if isinstance(profiles_data, list) else 0
# If status is "preparing" but files are done, auto-update status to "ready"
if status == "preparing":
try:
state_data["status"] = "ready"
from datetime import datetime
state_data["updated_at"] = datetime.now().isoformat()
with open(state_file, 'w', encoding='utf-8') as f:
json.dump(state_data, f, ensure_ascii=False, indent=2)
logger.info(f"Auto-updated simulation status: {simulation_id} preparing -> ready")
status = "ready"
except Exception as e:
logger.warning(f"Failed to auto-update status: {e}")
logger.info(f"Simulation {simulation_id} check result: preparation complete (status={status}, config_generated={config_generated})")
return True, {
"status": status,
"entities_count": state_data.get("entities_count", 0),
"profiles_count": profiles_count,
"entity_types": state_data.get("entity_types", []),
"config_generated": config_generated,
"created_at": state_data.get("created_at"),
"updated_at": state_data.get("updated_at"),
"existing_files": existing_files
}
else:
logger.warning(f"Simulation {simulation_id} check result: preparation not complete (status={status}, config_generated={config_generated})")
return False, {
"reason": f"Status not in prepared list or config_generated is false: status={status}, config_generated={config_generated}",
"status": status,
"config_generated": config_generated
}
except Exception as e:
return False, {"reason": f"Failed to read state file: {str(e)}"}
@simulation_bp.route('/prepare', methods=['POST'])
def prepare_simulation():
"""
Prepare the simulation environment (async task, all parameters generated by LLM)
This is a long-running operation; the endpoint returns task_id immediately.
Use GET /api/simulation/prepare/status to poll progress.
Features:
- Automatically detects completed preparation to avoid regenerating
- Returns existing results immediately if preparation is already done
- Supports force regeneration (force_regenerate=true)
Steps:
1. Check whether preparation has already been completed
2. Read and filter entities from the Zep knowledge graph
3. Generate OASIS Agent Profiles for each entity (with retry)
4. Intelligently generate simulation config via LLM (with retry)
5. Save config files and preset scripts
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"entity_types": ["Student", "PublicFigure"], // optional, specify entity types
"use_llm_for_profiles": true, // optional, use LLM to generate personas
"parallel_profile_count": 5, // optional, parallel profile generation, default 5
"force_regenerate": false // optional, force regeneration, default false
}
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"task_id": "task_xxxx", // returned for new tasks
"status": "preparing|ready",
"message": "Preparation task started | Preparation already complete",
"already_prepared": true|false // whether preparation was already done
}
}
"""
import threading
import os
from ..models.task import TaskManager, TaskStatus
from ..config import Config
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if not state:
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
}), 404
# Check whether force regeneration is requested
force_regenerate = data.get('force_regenerate', False)
logger.info(f"Processing /prepare request: simulation_id={simulation_id}, force_regenerate={force_regenerate}")
# Check whether preparation is already done (avoid regenerating)
if not force_regenerate:
logger.debug(f"Checking whether simulation {simulation_id} is already prepared...")
is_prepared, prepare_info = _check_simulation_prepared(simulation_id)
logger.debug(f"Check result: is_prepared={is_prepared}, prepare_info={prepare_info}")
if is_prepared:
logger.info(f"Simulation {simulation_id} is already prepared; skipping regeneration")
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"status": "ready",
"message": t('api.alreadyPrepared'),
"already_prepared": True,
"prepare_info": prepare_info
}
})
else:
logger.info(f"Simulation {simulation_id} is not prepared; starting preparation task")
# Get required info from project
project = ProjectManager.get_project(state.project_id)
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=state.project_id)
}), 404
# Get simulation requirement
simulation_requirement = project.get("simulation_requirement") or ""
if not simulation_requirement:
return jsonify({
"success": False,
"error": t('api.projectMissingRequirement')
}), 400
# Get document text
document_text = ProjectManager.get_extracted_text(state.project_id, get_storage()) or ""
entity_types_list = data.get('entity_types')
max_agents = data.get('max_agents') # optional: limit to top-N most-connected entities
if max_agents is not None:
try:
max_agents = int(max_agents)
except (TypeError, ValueError):
return jsonify({"success": False, "error": "max_agents must be an integer"}), 400
use_llm_for_profiles = data.get('use_llm_for_profiles', True)
parallel_profile_count = data.get('parallel_profile_count', 5)
# ========== Synchronously fetch entity count (before background task starts) ==========
# This lets the frontend obtain the expected total agent count immediately after calling prepare.
try:
logger.info(f"Synchronously fetching entity count: graph_id={state.graph_id}")
reader = ZepEntityReader()
# Quick entity read (no edge info needed, just count)
filtered_preview = reader.filter_defined_entities(
graph_id=state.graph_id,
defined_entity_types=entity_types_list,
enrich_with_edges=False # Skip edge info to speed things up
)
# Save entity count to state (so frontend can fetch it immediately)
state.entities_count = filtered_preview.filtered_count
state.entity_types = list(filtered_preview.entity_types)
logger.info(f"Expected entity count: {filtered_preview.filtered_count}, types: {filtered_preview.entity_types}")
except Exception as e:
logger.warning(f"Failed to synchronously fetch entity count (will retry in background task): {e}")
# Failure does not block the rest of the flow; the background task will retry.
# Create async task
task_manager = TaskManager()
task_id = task_manager.create_task(
task_type="simulation_prepare",
metadata={
"simulation_id": simulation_id,
"project_id": state.project_id
}
)
# Update simulation status (includes pre-fetched entity count)
state.status = SimulationStatus.PREPARING
manager._save_simulation_state(state)
# Capture locale before spawning background thread
current_locale = get_locale()
# Define background task
def run_prepare():
set_locale(current_locale)
try:
task_manager.update_task(
task_id,
status=TaskStatus.PROCESSING,
progress=0,
message=t('progress.startPreparingEnv')
)
# Prepare simulation (with progress callback)
# Store per-stage progress details
stage_details = {}
def progress_callback(stage, progress, message, **kwargs):
# Calculate overall progress
stage_weights = {
"reading": (0, 20), # 0-20%
"generating_profiles": (20, 70), # 20-70%
"generating_config": (70, 90), # 70-90%
"copying_scripts": (90, 100) # 90-100%
}
start, end = stage_weights.get(stage, (0, 100))
current_progress = int(start + (end - start) * progress / 100)
# Build detailed progress info
stage_names = {
"reading": t('progress.readingGraphEntities'),
"generating_profiles": t('progress.generatingProfiles'),
"generating_config": t('progress.generatingSimConfig'),
"copying_scripts": t('progress.preparingScripts')
}
stage_index = list(stage_weights.keys()).index(stage) + 1 if stage in stage_weights else 1
total_stages = len(stage_weights)
# Update stage details
stage_details[stage] = {
"stage_name": stage_names.get(stage, stage),
"stage_progress": progress,
"current": kwargs.get("current", 0),
"total": kwargs.get("total", 0),
"item_name": kwargs.get("item_name", "")
}
# Build detailed progress info
detail = stage_details[stage]
progress_detail_data = {
"current_stage": stage,
"current_stage_name": stage_names.get(stage, stage),
"stage_index": stage_index,
"total_stages": total_stages,
"stage_progress": progress,
"current_item": detail["current"],
"total_items": detail["total"],
"item_description": message
}
# Build concise message
if detail["total"] > 0:
detailed_message = (
f"[{stage_index}/{total_stages}] {stage_names.get(stage, stage)}: "
f"{detail['current']}/{detail['total']} - {message}"
)
else:
detailed_message = f"[{stage_index}/{total_stages}] {stage_names.get(stage, stage)}: {message}"
task_manager.update_task(
task_id,
progress=current_progress,
message=detailed_message,
progress_detail=progress_detail_data
)
result_state = manager.prepare_simulation(
simulation_id=simulation_id,
simulation_requirement=simulation_requirement,
document_text=document_text,
defined_entity_types=entity_types_list,
max_agents=max_agents,
use_llm_for_profiles=use_llm_for_profiles,
progress_callback=progress_callback,
parallel_profile_count=parallel_profile_count
)
# Task complete
task_manager.complete_task(
task_id,
result=result_state.to_simple_dict()
)
except Exception as e:
logger.error(f"Failed to prepare simulation: {str(e)}")
task_manager.fail_task(task_id, str(e))
# Update simulation status to failed
state = manager.get_simulation(simulation_id)
if state:
state.status = SimulationStatus.FAILED
state.error = str(e)
manager._save_simulation_state(state)
# Start background thread
thread = threading.Thread(target=run_prepare, daemon=True)
thread.start()
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"task_id": task_id,
"status": "preparing",
"message": t('api.prepareStarted'),
"already_prepared": False,
"expected_entities_count": state.entities_count, # Expected total agent count
"entity_types": state.entity_types # Entity type list
}
})
except ValueError as e:
return jsonify({
"success": False,
"error": str(e)
}), 404
except Exception as e:
logger.error(f"Failed to start preparation task: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/prepare/status', methods=['POST'])
def get_prepare_status():
"""
Query preparation task progress
Supports two query modes:
1. Query an in-progress task by task_id
2. Check whether preparation is already complete via simulation_id
Request (JSON):
{
"task_id": "task_xxxx", // optional, task_id from prepare
"simulation_id": "sim_xxxx" // optional, simulation ID (to check completed preparation)
}
Returns:
{
"success": true,
"data": {
"task_id": "task_xxxx",
"status": "processing|completed|ready",
"progress": 45,
"message": "...",
"already_prepared": true|false, // whether preparation was already done
"prepare_info": {...} // details when preparation is complete
}
}
"""
from ..models.task import TaskManager
try:
data = request.get_json() or {}
task_id = data.get('task_id')
simulation_id = data.get('simulation_id')
# If simulation_id provided, check whether preparation is already complete
if simulation_id:
is_prepared, prepare_info = _check_simulation_prepared(simulation_id)
if is_prepared:
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"status": "ready",
"progress": 100,
"message": t('api.alreadyPrepared'),
"already_prepared": True,
"prepare_info": prepare_info
}
})
# If no task_id, return error
if not task_id:
if simulation_id:
# Has simulation_id but not yet prepared
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"status": "not_started",
"progress": 0,
"message": t('api.notStartedPrepare'),
"already_prepared": False
}
})
return jsonify({
"success": False,
"error": t('api.requireTaskOrSimId')
}), 400
task_manager = TaskManager()
task = task_manager.get_task(task_id)
if not task:
# Task not found; if simulation_id provided, check whether preparation is done
if simulation_id:
is_prepared, prepare_info = _check_simulation_prepared(simulation_id)
if is_prepared:
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"task_id": task_id,
"status": "ready",
"progress": 100,
"message": t('api.taskCompletedPrepared'),
"already_prepared": True,
"prepare_info": prepare_info
}
})
return jsonify({
"success": False,
"error": t('api.taskNotFound', id=task_id)
}), 404
task_dict = task
task_dict["already_prepared"] = False
return jsonify({
"success": True,
"data": task_dict
})
except Exception as e:
logger.error(f"Failed to query task status: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@simulation_bp.route('/<simulation_id>', methods=['GET'])
def get_simulation(simulation_id: str):
"""Get simulation status"""
try:
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if not state:
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
}), 404
result = state.to_dict()
# If simulation is ready, attach run instructions
if state.status == SimulationStatus.READY:
result["run_instructions"] = manager.get_run_instructions(simulation_id)
return jsonify({
"success": True,
"data": result
})
except Exception as e:
logger.error(f"Failed to get simulation status: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/list', methods=['GET'])
def list_simulations():
"""
List all simulations
Query parameters:
project_id: filter by project ID (optional)
"""
try:
project_id = request.args.get('project_id')
manager = SimulationManager()
simulations = manager.list_simulations(project_id=project_id)
return jsonify({
"success": True,
"data": [s.to_dict() for s in simulations],
"count": len(simulations)
})
except Exception as e:
logger.error(f"Failed to list simulations: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
def _get_report_id_for_simulation(simulation_id: str) -> str:
"""
Get the most recent report_id associated with a simulation.
Scans the reports directory for reports matching simulation_id,
returning the most recent one (sorted by created_at) if multiple exist.
Args:
simulation_id: simulation ID
Returns:
report_id or None
"""
import json
from datetime import datetime
# reports directory path: backend/uploads/reports
# __file__ is app/api/simulation.py — go up two levels to backend/
reports_dir = os.path.join(os.path.dirname(__file__), '../../uploads/reports')
if not os.path.exists(reports_dir):
return None
matching_reports = []
try:
for report_folder in os.listdir(reports_dir):
report_path = os.path.join(reports_dir, report_folder)
if not os.path.isdir(report_path):
continue
meta_file = os.path.join(report_path, "meta.json")
if not os.path.exists(meta_file):
continue
try:
with open(meta_file, 'r', encoding='utf-8') as f:
meta = json.load(f)
if meta.get("simulation_id") == simulation_id:
matching_reports.append({
"report_id": meta.get("report_id"),
"created_at": meta.get("created_at", ""),
"status": meta.get("status", "")
})
except Exception:
continue
if not matching_reports:
return None
# Sort by created_at descending, return the most recent
matching_reports.sort(key=lambda x: x.get("created_at", ""), reverse=True)
return matching_reports[0].get("report_id")
except Exception as e:
logger.warning(f"Failed to find report for simulation {simulation_id}: {e}")
return None
@simulation_bp.route('/history', methods=['GET'])
def get_simulation_history():
"""
Get historical simulation list (with project details)
Used for the homepage history view; returns enriched simulation list including
project name, description, etc.
Query parameters:
limit: result count limit (default 20)
Returns:
{
"success": true,
"data": [
{
"simulation_id": "sim_xxxx",
"project_id": "proj_xxxx",
"project_name": "Public Opinion Analysis",
"simulation_requirement": "If the university announces...",
"status": "completed",
"entities_count": 68,
"profiles_count": 68,
"entity_types": ["Student", "Professor", ...],
"created_at": "2024-12-10",
"updated_at": "2024-12-10",
"total_rounds": 120,
"current_round": 120,
"report_id": "report_xxxx",
"version": "v1.0.2"
},
...
],
"count": 7
}
"""
try:
limit = request.args.get('limit', 20, type=int)
manager = SimulationManager()
simulations = manager.list_simulations()[:limit]
# Enrich simulation data, reading only from Simulation files
enriched_simulations = []
for sim in simulations:
sim_dict = sim.to_dict()
# Get simulation config info (read simulation_requirement from simulation_config.json)
config = manager.get_simulation_config(sim.simulation_id)
if config:
sim_dict["simulation_requirement"] = config.get("simulation_requirement", "")
time_config = config.get("time_config", {})
sim_dict["total_simulation_hours"] = time_config.get("total_simulation_hours", 0)
# Recommended rounds (fallback)
recommended_rounds = int(
time_config.get("total_simulation_hours", 0) * 60 /
max(time_config.get("minutes_per_round", 60), 1)
)
else:
sim_dict["simulation_requirement"] = ""
sim_dict["total_simulation_hours"] = 0
recommended_rounds = 0
# Get run state (read user-configured actual rounds from run_state.json)
run_state = SimulationRunner.get_run_state(sim.simulation_id)
if run_state:
sim_dict["current_round"] = run_state.current_round
sim_dict["runner_status"] = run_state.runner_status.value
# Use user-configured total_rounds; fall back to recommended rounds if not set
sim_dict["total_rounds"] = run_state.total_rounds if run_state.total_rounds > 0 else recommended_rounds
else:
sim_dict["current_round"] = 0
sim_dict["runner_status"] = "idle"
sim_dict["total_rounds"] = recommended_rounds
# Get associated project's file list (up to 3)
project = ProjectManager.get_project(sim.project_id)
if project and project.get("files"):
sim_dict["files"] = [
{"filename": f.get("filename", "Unknown file")}
for f in project.get("files", [])[:3]
]
else:
sim_dict["files"] = []
# Get associated report_id (find the most recent report for this simulation)
sim_dict["report_id"] = _get_report_id_for_simulation(sim.simulation_id)
# Add version
sim_dict["version"] = "v1.0.2"
# Format date
try:
created_date = sim_dict.get("created_at", "")[:10]
sim_dict["created_date"] = created_date
except:
sim_dict["created_date"] = ""
enriched_simulations.append(sim_dict)
return jsonify({
"success": True,
"data": enriched_simulations,
"count": len(enriched_simulations)
})
except Exception as e:
logger.error(f"Failed to get simulation history: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/<simulation_id>/profiles', methods=['GET'])
def get_simulation_profiles(simulation_id: str):
"""
Get Agent Profiles for a simulation
Query parameters:
platform: platform type (reddit/twitter, default reddit)
"""
try:
platform = request.args.get('platform', 'reddit')
manager = SimulationManager()
profiles = manager.get_profiles(simulation_id, platform=platform)
return jsonify({
"success": True,
"data": {
"platform": platform,
"count": len(profiles),
"profiles": profiles
}
})
except ValueError as e:
return jsonify({
"success": False,
"error": str(e)
}), 404
except Exception as e:
logger.error(f"Failed to get profiles: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/<simulation_id>/profiles/realtime', methods=['GET'])
def get_simulation_profiles_realtime(simulation_id: str):
"""
Fetch Agent Profiles in real-time (for viewing progress during generation)
Differences from /profiles:
- Reads files directly, bypassing SimulationManager
- Suitable for real-time viewing during generation
- Returns extra metadata (e.g. file modification time, whether generation is in progress)
Query parameters:
platform: platform type (reddit/twitter, default reddit)
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"platform": "reddit",
"count": 15,
"total_expected": 93, // expected total (if available)
"is_generating": true, // whether generation is in progress
"file_exists": true,
"file_modified_at": "2025-12-04T18:20:00",
"profiles": [...]
}
}
"""
import json
import csv
from datetime import datetime
try:
platform = request.args.get('platform', 'reddit')
# Get simulation directory
sim_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id)
if not os.path.exists(sim_dir):
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
}), 404
# Determine file path
if platform == "reddit":
profiles_file = os.path.join(sim_dir, "reddit_profiles.json")
else:
profiles_file = os.path.join(sim_dir, "twitter_profiles.csv")
# Check if file exists
file_exists = os.path.exists(profiles_file)
profiles = []
file_modified_at = None
if file_exists:
# Get file modification time
file_stat = os.stat(profiles_file)
file_modified_at = datetime.fromtimestamp(file_stat.st_mtime).isoformat()
try:
if platform == "reddit":
with open(profiles_file, 'r', encoding='utf-8') as f:
profiles = json.load(f)
else:
with open(profiles_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
profiles = list(reader)
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"Failed to read profiles file (may be mid-write): {e}")
profiles = []
# Check if generation is in progress (via state.json)
is_generating = False
total_expected = None
state_file = os.path.join(sim_dir, "state.json")
if os.path.exists(state_file):
try:
with open(state_file, 'r', encoding='utf-8') as f:
state_data = json.load(f)
status = state_data.get("status", "")
is_generating = status == "preparing"
total_expected = state_data.get("entities_count")
except Exception:
pass
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"platform": platform,
"count": len(profiles),
"total_expected": total_expected,
"is_generating": is_generating,
"file_exists": file_exists,
"file_modified_at": file_modified_at,
"profiles": profiles
}
})
except Exception as e:
logger.error(f"Failed to get profiles in real-time: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/<simulation_id>/config/realtime', methods=['GET'])
def get_simulation_config_realtime(simulation_id: str):
"""
Fetch simulation config in real-time (for viewing progress during generation)
Differences from /config:
- Reads files directly, bypassing SimulationManager
- Suitable for real-time viewing during generation
- Returns extra metadata (e.g. file modification time, whether generation is in progress)
- Can return partial information even before config generation is complete
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"file_exists": true,
"file_modified_at": "2025-12-04T18:20:00",
"is_generating": true, // whether generation is in progress
"generation_stage": "generating_config", // current generation stage
"config": {...} // config content (if it exists)
}
}
"""
import json
from datetime import datetime
try:
# Get simulation directory
sim_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id)
if not os.path.exists(sim_dir):
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
}), 404
# Config file path
config_file = os.path.join(sim_dir, "simulation_config.json")
# Check if file exists
file_exists = os.path.exists(config_file)
config = None
file_modified_at = None
if file_exists:
# Get file modification time
file_stat = os.stat(config_file)
file_modified_at = datetime.fromtimestamp(file_stat.st_mtime).isoformat()
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"Failed to read config file (may be mid-write): {e}")
config = None
# Check if generation is in progress (via state.json)
is_generating = False
generation_stage = None
config_generated = False
state_file = os.path.join(sim_dir, "state.json")
if os.path.exists(state_file):
try:
with open(state_file, 'r', encoding='utf-8') as f:
state_data = json.load(f)
status = state_data.get("status", "")
is_generating = status == "preparing"
config_generated = state_data.get("config_generated", False)
# Determine current stage
if is_generating:
if state_data.get("profiles_generated", False):
generation_stage = "generating_config"
else:
generation_stage = "generating_profiles"
elif status == "ready":
generation_stage = "completed"
except Exception:
pass
# Build response data
response_data = {
"simulation_id": simulation_id,
"file_exists": file_exists,
"file_modified_at": file_modified_at,
"is_generating": is_generating,
"generation_stage": generation_stage,
"config_generated": config_generated,
"config": config
}
# If config exists, extract key summary stats
if config:
response_data["summary"] = {
"total_agents": len(config.get("agent_configs", [])),
"simulation_hours": config.get("time_config", {}).get("total_simulation_hours"),
"initial_posts_count": len(config.get("event_config", {}).get("initial_posts", [])),
"hot_topics_count": len(config.get("event_config", {}).get("hot_topics", [])),
"has_twitter_config": "twitter_config" in config,
"has_reddit_config": "reddit_config" in config,
"generated_at": config.get("generated_at"),
"llm_model": config.get("llm_model")
}
return jsonify({
"success": True,
"data": response_data
})
except Exception as e:
logger.error(f"Failed to get config in real-time: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/<simulation_id>/config', methods=['GET'])
def get_simulation_config(simulation_id: str):
"""
Get simulation config (full config intelligently generated by LLM)
Returns:
- time_config: time configuration (simulation duration, rounds, peak/off-peak periods)
- agent_configs: per-agent activity config (activity level, post frequency, stance, etc.)
- event_config: event configuration (initial posts, trending topics)
- platform_configs: platform configuration
- generation_reasoning: LLM's reasoning for the configuration
"""
try:
manager = SimulationManager()
config = manager.get_simulation_config(simulation_id)
if not config:
return jsonify({
"success": False,
"error": t('api.configNotFound')
}), 404
return jsonify({
"success": True,
"data": config
})
except Exception as e:
logger.error(f"Failed to get config: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/<simulation_id>/config/download', methods=['GET'])
def download_simulation_config(simulation_id: str):
"""Download simulation config file"""
try:
manager = SimulationManager()
sim_dir = manager._get_simulation_dir(simulation_id)
config_path = os.path.join(sim_dir, "simulation_config.json")
if not os.path.exists(config_path):
return jsonify({
"success": False,
"error": t('api.configFileNotFound')
}), 404
return send_file(
config_path,
as_attachment=True,
download_name="simulation_config.json"
)
except Exception as e:
logger.error(f"Failed to download config: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/script/<script_name>/download', methods=['GET'])
def download_simulation_script(script_name: str):
"""
Download a simulation run script (generic scripts in backend/scripts/)
Valid script_name values:
- run_twitter_simulation.py
- run_reddit_simulation.py
- run_parallel_simulation.py
- action_logger.py
"""
try:
# Scripts are in the backend/scripts/ directory
scripts_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../scripts'))
# Validate script name
allowed_scripts = [
"run_twitter_simulation.py",
"run_reddit_simulation.py",
"run_parallel_simulation.py",
"action_logger.py"
]
if script_name not in allowed_scripts:
return jsonify({
"success": False,
"error": t('api.unknownScript', name=script_name, allowed=allowed_scripts)
}), 400
script_path = os.path.join(scripts_dir, script_name)
if not os.path.exists(script_path):
return jsonify({
"success": False,
"error": t('api.scriptFileNotFound', name=script_name)
}), 404
return send_file(
script_path,
as_attachment=True,
download_name=script_name
)
except Exception as e:
logger.error(f"Failed to download script: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Profile generation endpoint (standalone use) ==============
@simulation_bp.route('/generate-profiles', methods=['POST'])
def generate_profiles():
"""
Generate OASIS Agent Profiles directly from the graph (without creating a simulation)
Request (JSON):
{
"graph_id": "mirofish_xxxx", // required
"entity_types": ["Student"], // optional
"use_llm": true, // optional
"platform": "reddit" // optional
}
"""
try:
data = request.get_json() or {}
graph_id = data.get('graph_id')
if not graph_id:
return jsonify({
"success": False,
"error": t('api.requireGraphId')
}), 400
entity_types = data.get('entity_types')
use_llm = data.get('use_llm', True)
platform = data.get('platform', 'reddit')
reader = ZepEntityReader()
filtered = reader.filter_defined_entities(
graph_id=graph_id,
defined_entity_types=entity_types,
enrich_with_edges=True
)
if filtered.filtered_count == 0:
return jsonify({
"success": False,
"error": t('api.noMatchingEntities')
}), 400
generator = OasisProfileGenerator(graph_id=graph_id)
profiles = generator.generate_profiles_from_entities(
entities=filtered.entities,
use_llm=use_llm
)
if platform == "reddit":
profiles_data = [p.to_reddit_format() for p in profiles]
elif platform == "twitter":
profiles_data = [p.to_twitter_format() for p in profiles]
else:
profiles_data = [p.to_dict() for p in profiles]
return jsonify({
"success": True,
"data": {
"platform": platform,
"entity_types": list(filtered.entity_types),
"count": len(profiles_data),
"profiles": profiles_data
}
})
except Exception as e:
logger.error(f"Failed to generate profiles: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Simulation run control endpoints ==============
@simulation_bp.route('/start', methods=['POST'])
def start_simulation():
"""
Start running a simulation
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"platform": "parallel", // optional: twitter / reddit / parallel (default)
"max_rounds": 100, // optional: max simulation rounds, to cap long simulations
"enable_graph_memory_update": false, // optional: whether to dynamically update Agent activities to Zep graph memory
"force": false // optional: force restart (stops running simulation and clears logs)
}
About force parameter:
- When enabled, if the simulation is running or already complete, it is stopped and run logs are cleared
- Cleared items: run_state.json, actions.jsonl, simulation.log, etc.
- Config files (simulation_config.json) and profile files are NOT cleared
- Use when you need to re-run a simulation
About enable_graph_memory_update:
- When enabled, all Agent activities (posts, comments, likes, etc.) are updated to Zep graph in real time
- This lets the graph "remember" the simulation process for later analysis or AI chat
- Requires the simulation's associated project to have a valid graph_id
- Uses a batch update mechanism to reduce API call count
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"runner_status": "running",
"process_pid": 12345,
"twitter_running": true,
"reddit_running": true,
"started_at": "2025-12-01T10:00:00",
"graph_memory_update_enabled": true, // whether graph memory update is enabled
"force_restarted": true // whether this is a forced restart
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
platform = data.get('platform', 'parallel')
max_rounds = data.get('max_rounds') # optional: max simulation rounds
enable_graph_memory_update = data.get('enable_graph_memory_update', False) # optional: enable graph memory update
force = data.get('force', False) # optional: force restart
# Validate max_rounds parameter
if max_rounds is not None:
try:
max_rounds = int(max_rounds)
if max_rounds <= 0:
return jsonify({
"success": False,
"error": t('api.maxRoundsPositive')
}), 400
except (ValueError, TypeError):
return jsonify({
"success": False,
"error": t('api.maxRoundsInvalid')
}), 400
if platform not in ['twitter', 'reddit', 'parallel']:
return jsonify({
"success": False,
"error": t('api.invalidPlatform', platform=platform)
}), 400
# Check if simulation is ready
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if not state:
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
}), 404
force_restarted = False
# Smart status handling: allow restart if preparation is complete
if state.status != SimulationStatus.READY:
# Check whether preparation has been completed
is_prepared, prepare_info = _check_simulation_prepared(simulation_id)
if is_prepared:
# Preparation is complete; check if a process is still running
if state.status == SimulationStatus.RUNNING:
# Check whether the simulation process is actually running
run_state = SimulationRunner.get_run_state(simulation_id)
if run_state and run_state.runner_status.value == "running":
# Process is indeed running
if force:
# Force mode: stop the running simulation
logger.info(f"Force mode: stopping running simulation {simulation_id}")
try:
SimulationRunner.stop_simulation(simulation_id)
except Exception as e:
logger.warning(f"Warning while stopping simulation: {str(e)}")
else:
return jsonify({
"success": False,
"error": t('api.simRunningForceHint')
}), 400
# If in force mode, clear run logs
if force:
logger.info(f"Force mode: clearing simulation logs for {simulation_id}")
cleanup_result = SimulationRunner.cleanup_simulation_logs(simulation_id)
if not cleanup_result.get("success"):
logger.warning(f"Warning while clearing logs: {cleanup_result.get('errors')}")
force_restarted = True
# Process does not exist or has ended; reset status to ready
logger.info(f"Simulation {simulation_id} preparation complete; resetting status to ready (was: {state.status.value})")
state.status = SimulationStatus.READY
manager._save_simulation_state(state)
else:
# Preparation not yet complete
return jsonify({
"success": False,
"error": t('api.simNotReady', status=state.status.value)
}), 400
# Get graph ID (for graph memory update)
graph_id = None
if enable_graph_memory_update:
# Get graph_id from simulation state or project
graph_id = state.graph_id
if not graph_id:
# Try to get from project
project = ProjectManager.get_project(state.project_id)
if project:
graph_id = project.get("graph_id")
if not graph_id:
return jsonify({
"success": False,
"error": t('api.graphIdRequiredForMemory')
}), 400
logger.info(f"Graph memory update enabled: simulation_id={simulation_id}, graph_id={graph_id}")
# Clone graph for per-simulation isolation
graph_id_simulation = None
if enable_graph_memory_update and graph_id:
try:
graph_id_sim = f"mirofish_{simulation_id}_sim"
from ..graph import get_graph_backend
graph_backend = get_graph_backend()
if hasattr(graph_backend, 'clone_graph'):
import asyncio as _asyncio
import concurrent.futures as _futures
try:
loop = _asyncio.get_event_loop()
if loop.is_running():
with _futures.ThreadPoolExecutor() as pool:
future = pool.submit(_asyncio.run, graph_backend.clone_graph(graph_id, graph_id_sim))
future.result()
else:
loop.run_until_complete(graph_backend.clone_graph(graph_id, graph_id_sim))
except RuntimeError:
_asyncio.run(graph_backend.clone_graph(graph_id, graph_id_sim))
state.graph_id_simulation = graph_id_sim
manager._save_simulation_state(state)
graph_id_simulation = graph_id_sim
logger.info(f"Graph cloned for simulation isolation: {graph_id_sim}")
except Exception as e:
logger.warning(f"Graph cloning failed, simulation uses shared graph: {e}")
# Start simulation
run_state = SimulationRunner.start_simulation(
simulation_id=simulation_id,
platform=platform,
max_rounds=max_rounds,
enable_graph_memory_update=enable_graph_memory_update,
graph_id=graph_id
)
# Update simulation status
state.status = SimulationStatus.RUNNING
manager._save_simulation_state(state)
response_data = run_state.to_dict()
if max_rounds:
response_data['max_rounds_applied'] = max_rounds
response_data['graph_memory_update_enabled'] = enable_graph_memory_update
response_data['force_restarted'] = force_restarted
if enable_graph_memory_update:
response_data['graph_id'] = graph_id
if graph_id_simulation:
response_data['graph_id_simulation'] = graph_id_simulation
return jsonify({
"success": True,
"data": response_data
})
except ValueError as e:
return jsonify({
"success": False,
"error": str(e)
}), 400
except Exception as e:
logger.error(f"Failed to start simulation: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/stop', methods=['POST'])
def stop_simulation():
"""
Stop the simulation
Request (JSON):
{
"simulation_id": "sim_xxxx" // required, simulation ID
}
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"runner_status": "stopped",
"completed_at": "2025-12-01T12:00:00"
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
run_state = SimulationRunner.stop_simulation(simulation_id)
# Update simulation status
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if state:
state.status = SimulationStatus.PAUSED
manager._save_simulation_state(state)
return jsonify({
"success": True,
"data": run_state.to_dict()
})
except ValueError as e:
return jsonify({
"success": False,
"error": str(e)
}), 400
except Exception as e:
logger.error(f"Failed to stop simulation: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Real-time status monitoring endpoints ==============
@simulation_bp.route('/<simulation_id>/run-status', methods=['GET'])
def get_run_status(simulation_id: str):
"""
Get real-time simulation run status (for frontend polling)
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"runner_status": "running",
"current_round": 5,
"total_rounds": 144,
"progress_percent": 3.5,
"simulated_hours": 2,
"total_simulation_hours": 72,
"twitter_running": true,
"reddit_running": true,
"twitter_actions_count": 150,
"reddit_actions_count": 200,
"total_actions_count": 350,
"started_at": "2025-12-01T10:00:00",
"updated_at": "2025-12-01T10:30:00"
}
}
"""
try:
run_state = SimulationRunner.get_run_state(simulation_id)
if not run_state:
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"runner_status": "idle",
"current_round": 0,
"total_rounds": 0,
"progress_percent": 0,
"twitter_actions_count": 0,
"reddit_actions_count": 0,
"total_actions_count": 0,
}
})
return jsonify({
"success": True,
"data": run_state.to_dict()
})
except Exception as e:
logger.error(f"Failed to get run status: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/<simulation_id>/run-status/detail', methods=['GET'])
def get_run_status_detail(simulation_id: str):
"""
Get detailed simulation run status (including all actions)
Used for real-time activity display in the frontend.
Query parameters:
platform: filter by platform (twitter/reddit, optional)
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"runner_status": "running",
"current_round": 5,
...
"all_actions": [
{
"round_num": 5,
"timestamp": "2025-12-01T10:30:00",
"platform": "twitter",
"agent_id": 3,
"agent_name": "Agent Name",
"action_type": "CREATE_POST",
"action_args": {"content": "..."},
"result": null,
"success": true
},
...
],
"twitter_actions": [...], # All actions on the Twitter platform
"reddit_actions": [...] # All actions on the Reddit platform
}
}
"""
try:
run_state = SimulationRunner.get_run_state(simulation_id)
platform_filter = request.args.get('platform')
if not run_state:
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"runner_status": "idle",
"all_actions": [],
"twitter_actions": [],
"reddit_actions": []
}
})
# Get full action list
all_actions = SimulationRunner.get_all_actions(
simulation_id=simulation_id,
platform=platform_filter
)
# Get actions per platform
twitter_actions = SimulationRunner.get_all_actions(
simulation_id=simulation_id,
platform="twitter"
) if not platform_filter or platform_filter == "twitter" else []
reddit_actions = SimulationRunner.get_all_actions(
simulation_id=simulation_id,
platform="reddit"
) if not platform_filter or platform_filter == "reddit" else []
# Get actions for the current round (recent_actions shows only the latest round)
current_round = run_state.current_round
recent_actions = SimulationRunner.get_all_actions(
simulation_id=simulation_id,
platform=platform_filter,
round_num=current_round
) if current_round > 0 else []
# Get basic status info
result = run_state.to_dict()
result["all_actions"] = [a.to_dict() for a in all_actions]
result["twitter_actions"] = [a.to_dict() for a in twitter_actions]
result["reddit_actions"] = [a.to_dict() for a in reddit_actions]
result["rounds_count"] = len(run_state.rounds)
# recent_actions shows only the current latest round across both platforms
result["recent_actions"] = [a.to_dict() for a in recent_actions]
return jsonify({
"success": True,
"data": result
})
except Exception as e:
logger.error(f"Failed to get detailed status: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/<simulation_id>/actions', methods=['GET'])
def get_simulation_actions(simulation_id: str):
"""
Get Agent action history for a simulation
Query parameters:
limit: result count (default 100)
offset: offset (default 0)
platform: filter by platform (twitter/reddit)
agent_id: filter by Agent ID
round_num: filter by round number
Returns:
{
"success": true,
"data": {
"count": 100,
"actions": [...]
}
}
"""
try:
limit = request.args.get('limit', 100, type=int)
offset = request.args.get('offset', 0, type=int)
platform = request.args.get('platform')
agent_id = request.args.get('agent_id', type=int)
round_num = request.args.get('round_num', type=int)
actions = SimulationRunner.get_actions(
simulation_id=simulation_id,
limit=limit,
offset=offset,
platform=platform,
agent_id=agent_id,
round_num=round_num
)
return jsonify({
"success": True,
"data": {
"count": len(actions),
"actions": [a.to_dict() for a in actions]
}
})
except Exception as e:
logger.error(f"Failed to get action history: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/<simulation_id>/timeline', methods=['GET'])
def get_simulation_timeline(simulation_id: str):
"""
Get simulation timeline (summarized by round)
Used for progress bar and timeline view in the frontend.
Query parameters:
start_round: starting round (default 0)
end_round: ending round (default all)
Returns summary info for each round.
"""
try:
start_round = request.args.get('start_round', 0, type=int)
end_round = request.args.get('end_round', type=int)
timeline = SimulationRunner.get_timeline(
simulation_id=simulation_id,
start_round=start_round,
end_round=end_round
)
return jsonify({
"success": True,
"data": {
"rounds_count": len(timeline),
"timeline": timeline
}
})
except Exception as e:
logger.error(f"Failed to get timeline: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/<simulation_id>/agent-stats', methods=['GET'])
def get_agent_stats(simulation_id: str):
"""
Get statistics for each Agent
Used to display agent activity rankings and action distribution in the frontend.
"""
try:
stats = SimulationRunner.get_agent_stats(simulation_id)
return jsonify({
"success": True,
"data": {
"agents_count": len(stats),
"stats": stats
}
})
except Exception as e:
logger.error(f"Failed to get Agent stats: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Database query endpoints ==============
@simulation_bp.route('/<simulation_id>/posts', methods=['GET'])
def get_simulation_posts(simulation_id: str):
"""
Get posts from a simulation
Query parameters:
platform: platform type (twitter/reddit)
limit: result count (default 50)
offset: offset
Returns list of posts (read from SQLite database)
"""
try:
platform = request.args.get('platform', 'reddit')
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
sim_dir = os.path.join(
os.path.dirname(__file__),
f'../../uploads/simulations/{simulation_id}'
)
db_file = f"{platform}_simulation.db"
db_path = os.path.join(sim_dir, db_file)
if not os.path.exists(db_path):
return jsonify({
"success": True,
"data": {
"platform": platform,
"count": 0,
"posts": [],
"message": t('api.dbNotExist')
}
})
import sqlite3
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
cursor.execute("""
SELECT * FROM post
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", (limit, offset))
posts = [dict(row) for row in cursor.fetchall()]
cursor.execute("SELECT COUNT(*) FROM post")
total = cursor.fetchone()[0]
except sqlite3.OperationalError:
posts = []
total = 0
conn.close()
return jsonify({
"success": True,
"data": {
"platform": platform,
"total": total,
"count": len(posts),
"posts": posts
}
})
except Exception as e:
logger.error(f"Failed to get posts: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/<simulation_id>/comments', methods=['GET'])
def get_simulation_comments(simulation_id: str):
"""
Get comments from a simulation (Reddit only)
Query parameters:
post_id: filter by post ID (optional)
limit: result count
offset: offset
"""
try:
post_id = request.args.get('post_id')
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
sim_dir = os.path.join(
os.path.dirname(__file__),
f'../../uploads/simulations/{simulation_id}'
)
db_path = os.path.join(sim_dir, "reddit_simulation.db")
if not os.path.exists(db_path):
return jsonify({
"success": True,
"data": {
"count": 0,
"comments": []
}
})
import sqlite3
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
if post_id:
cursor.execute("""
SELECT * FROM comment
WHERE post_id = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", (post_id, limit, offset))
else:
cursor.execute("""
SELECT * FROM comment
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", (limit, offset))
comments = [dict(row) for row in cursor.fetchall()]
except sqlite3.OperationalError:
comments = []
conn.close()
return jsonify({
"success": True,
"data": {
"count": len(comments),
"comments": comments
}
})
except Exception as e:
logger.error(f"Failed to get comments: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Interview endpoints ==============
@simulation_bp.route('/interview', methods=['POST'])
def interview_agent():
"""
Interview a single Agent
Note: this feature requires the simulation environment to be running
(after the simulation loop completes it enters command-waiting mode).
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"agent_id": 0, // required, Agent ID
"prompt": "What do you think?", // required, interview question
"platform": "twitter", // optional, specify platform (twitter/reddit)
// if not specified: dual-platform simulation interviews both
"timeout": 60 // optional, timeout in seconds, default 60
}
Returns (no platform specified, dual-platform mode):
{
"success": true,
"data": {
"agent_id": 0,
"prompt": "What do you think?",
"result": {
"agent_id": 0,
"prompt": "...",
"platforms": {
"twitter": {"agent_id": 0, "response": "...", "platform": "twitter"},
"reddit": {"agent_id": 0, "response": "...", "platform": "reddit"}
}
},
"timestamp": "2025-12-08T10:00:01"
}
}
Returns (platform specified):
{
"success": true,
"data": {
"agent_id": 0,
"prompt": "What do you think?",
"result": {
"agent_id": 0,
"response": "I think...",
"platform": "twitter",
"timestamp": "2025-12-08T10:00:00"
},
"timestamp": "2025-12-08T10:00:01"
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
agent_id = data.get('agent_id')
prompt = data.get('prompt')
platform = data.get('platform') # optional: twitter/reddit/None
timeout = data.get('timeout', 60)
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
if agent_id is None:
return jsonify({
"success": False,
"error": t('api.requireAgentId')
}), 400
if not prompt:
return jsonify({
"success": False,
"error": t('api.requirePrompt')
}), 400
# Validate platform parameter
if platform and platform not in ("twitter", "reddit"):
return jsonify({
"success": False,
"error": t('api.invalidInterviewPlatform')
}), 400
# Check environment status
if not SimulationRunner.check_env_alive(simulation_id):
return jsonify({
"success": False,
"error": t('api.envNotRunning')
}), 400
# Optimize prompt: add prefix to prevent Agent from calling tools
optimized_prompt = optimize_interview_prompt(prompt)
result = SimulationRunner.interview_agent(
simulation_id=simulation_id,
agent_id=agent_id,
prompt=optimized_prompt,
platform=platform,
timeout=timeout
)
return jsonify({
"success": result.get("success", False),
"data": result
})
except ValueError as e:
return jsonify({
"success": False,
"error": str(e)
}), 400
except TimeoutError as e:
return jsonify({
"success": False,
"error": t('api.interviewTimeout', error=str(e))
}), 504
except Exception as e:
logger.error(f"Interview failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/interview/batch', methods=['POST'])
def interview_agents_batch():
"""
Batch interview multiple Agents
Note: this feature requires the simulation environment to be running.
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"interviews": [ // required, interview list
{
"agent_id": 0,
"prompt": "What do you think of A?",
"platform": "twitter" // optional, specify platform for this agent
},
{
"agent_id": 1,
"prompt": "What do you think of B?" // no platform: uses default
}
],
"platform": "reddit", // optional, default platform (overridden per item)
// if not specified: dual-platform each agent is interviewed on both
"timeout": 120 // optional, timeout in seconds, default 120
}
Returns:
{
"success": true,
"data": {
"interviews_count": 2,
"result": {
"interviews_count": 4,
"results": {
"twitter_0": {"agent_id": 0, "response": "...", "platform": "twitter"},
"reddit_0": {"agent_id": 0, "response": "...", "platform": "reddit"},
"twitter_1": {"agent_id": 1, "response": "...", "platform": "twitter"},
"reddit_1": {"agent_id": 1, "response": "...", "platform": "reddit"}
}
},
"timestamp": "2025-12-08T10:00:01"
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
interviews = data.get('interviews')
platform = data.get('platform') # optional: twitter/reddit/None
timeout = data.get('timeout', 120)
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
if not interviews or not isinstance(interviews, list):
return jsonify({
"success": False,
"error": t('api.requireInterviews')
}), 400
# Validate platform parameter
if platform and platform not in ("twitter", "reddit"):
return jsonify({
"success": False,
"error": t('api.invalidInterviewPlatform')
}), 400
# Validate each interview item
for i, interview in enumerate(interviews):
if 'agent_id' not in interview:
return jsonify({
"success": False,
"error": t('api.interviewListMissingAgentId', index=i+1)
}), 400
if 'prompt' not in interview:
return jsonify({
"success": False,
"error": t('api.interviewListMissingPrompt', index=i+1)
}), 400
# Validate per-item platform (if present)
item_platform = interview.get('platform')
if item_platform and item_platform not in ("twitter", "reddit"):
return jsonify({
"success": False,
"error": t('api.interviewListInvalidPlatform', index=i+1)
}), 400
# Check environment status
if not SimulationRunner.check_env_alive(simulation_id):
return jsonify({
"success": False,
"error": t('api.envNotRunning')
}), 400
# Optimize prompt for each interview item: add prefix to prevent tool calls
optimized_interviews = []
for interview in interviews:
optimized_interview = interview.copy()
optimized_interview['prompt'] = optimize_interview_prompt(interview.get('prompt', ''))
optimized_interviews.append(optimized_interview)
result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=optimized_interviews,
platform=platform,
timeout=timeout
)
return jsonify({
"success": result.get("success", False),
"data": result
})
except ValueError as e:
return jsonify({
"success": False,
"error": str(e)
}), 400
except TimeoutError as e:
return jsonify({
"success": False,
"error": t('api.batchInterviewTimeout', error=str(e))
}), 504
except Exception as e:
logger.error(f"Batch interview failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/interview/all', methods=['POST'])
def interview_all_agents():
"""
Global interview - ask all Agents the same question
Note: this feature requires the simulation environment to be running.
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"prompt": "What is your overall take on this?", // required, interview question (same for all agents)
"platform": "reddit", // optional, specify platform (twitter/reddit)
// if not specified: dual-platform, each agent interviewed on both
"timeout": 180 // optional, timeout in seconds, default 180
}
Returns:
{
"success": true,
"data": {
"interviews_count": 50,
"result": {
"interviews_count": 100,
"results": {
"twitter_0": {"agent_id": 0, "response": "...", "platform": "twitter"},
"reddit_0": {"agent_id": 0, "response": "...", "platform": "reddit"},
...
}
},
"timestamp": "2025-12-08T10:00:01"
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
prompt = data.get('prompt')
platform = data.get('platform') # optional: twitter/reddit/None
timeout = data.get('timeout', 180)
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
if not prompt:
return jsonify({
"success": False,
"error": t('api.requirePrompt')
}), 400
# Validate platform parameter
if platform and platform not in ("twitter", "reddit"):
return jsonify({
"success": False,
"error": t('api.invalidInterviewPlatform')
}), 400
# Check environment status
if not SimulationRunner.check_env_alive(simulation_id):
return jsonify({
"success": False,
"error": t('api.envNotRunning')
}), 400
# Optimize prompt: add prefix to prevent tool calls
optimized_prompt = optimize_interview_prompt(prompt)
result = SimulationRunner.interview_all_agents(
simulation_id=simulation_id,
prompt=optimized_prompt,
platform=platform,
timeout=timeout
)
return jsonify({
"success": result.get("success", False),
"data": result
})
except ValueError as e:
return jsonify({
"success": False,
"error": str(e)
}), 400
except TimeoutError as e:
return jsonify({
"success": False,
"error": t('api.globalInterviewTimeout', error=str(e))
}), 504
except Exception as e:
logger.error(f"Global interview failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/interview/history', methods=['POST'])
def get_interview_history():
"""
Get Interview history records
Reads all Interview records from the simulation database.
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"platform": "reddit", // optional, platform type (reddit/twitter)
// if not specified, returns history from both platforms
"agent_id": 0, // optional, get only this agent's interview history
"limit": 100 // optional, result count, default 100
}
Returns:
{
"success": true,
"data": {
"count": 10,
"history": [
{
"agent_id": 0,
"response": "I think...",
"prompt": "What do you think about this?",
"timestamp": "2025-12-08T10:00:00",
"platform": "reddit"
},
...
]
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
platform = data.get('platform') # if not specified, return history from both platforms
agent_id = data.get('agent_id')
limit = data.get('limit', 100)
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
history = SimulationRunner.get_interview_history(
simulation_id=simulation_id,
platform=platform,
agent_id=agent_id,
limit=limit
)
return jsonify({
"success": True,
"data": {
"count": len(history),
"history": history
}
})
except Exception as e:
logger.error(f"Failed to get interview history: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/env-status', methods=['POST'])
def get_env_status():
"""
Get simulation environment status
Checks whether the simulation environment is alive (can receive Interview commands).
Request (JSON):
{
"simulation_id": "sim_xxxx" // required, simulation ID
}
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"env_alive": true,
"twitter_available": true,
"reddit_available": true,
"message": "Environment is running and ready to receive Interview commands"
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
env_alive = SimulationRunner.check_env_alive(simulation_id)
# Get more detailed status info
env_status = SimulationRunner.get_env_status_detail(simulation_id)
if env_alive:
message = t('api.envRunning')
else:
message = t('api.envNotRunningShort')
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"env_alive": env_alive,
"twitter_available": env_status.get("twitter_available", False),
"reddit_available": env_status.get("reddit_available", False),
"message": message
}
})
except Exception as e:
logger.error(f"Failed to get environment status: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@simulation_bp.route('/close-env', methods=['POST'])
def close_simulation_env():
"""
Close the simulation environment
Sends a close-environment command to the simulation, causing it to exit
command-waiting mode gracefully.
Note: this is different from /stop, which forcibly terminates the process.
This endpoint lets the simulation gracefully close the environment and exit.
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"timeout": 30 // optional, timeout in seconds, default 30
}
Returns:
{
"success": true,
"data": {
"message": "Environment close command sent",
"result": {...},
"timestamp": "2025-12-08T10:00:01"
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
timeout = data.get('timeout', 30)
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
result = SimulationRunner.close_simulation_env(
simulation_id=simulation_id,
timeout=timeout
)
# Update simulation status
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if state:
state.status = SimulationStatus.COMPLETED
manager._save_simulation_state(state)
return jsonify({
"success": result.get("success", False),
"data": result
})
except ValueError as e:
return jsonify({
"success": False,
"error": str(e)
}), 400
except Exception as e:
logger.error(f"Failed to close environment: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== F2-A: Agent CRUD endpoints ==============
@simulation_bp.route('/<simulation_id>/agent/<int:user_id>', methods=['PATCH'])
def patch_agent(simulation_id: str, user_id: int):
"""Update an agent profile (Fase A/B fields). Sets manually_edited=True."""
try:
fields = request.get_json() or {}
if not fields:
return jsonify({"success": False, "error": t('api.requireFields')}), 400
manager = SimulationManager()
try:
updated = manager.patch_agent_profile(simulation_id, user_id, fields)
except ValueError as e:
return jsonify({"success": False, "error": str(e)}), 404
except PermissionError as e:
return jsonify({"success": False, "error": str(e)}), 403
except LookupError as e:
return jsonify({"success": False, "error": str(e)}), 404
except FileNotFoundError as e:
return jsonify({"success": False, "error": str(e)}), 404
return jsonify({"success": True, "data": updated})
except Exception as e:
logger.error(f"patch_agent failed: {e}")
return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500
@simulation_bp.route('/<simulation_id>/agent/<int:user_id>', methods=['DELETE'])
def delete_agent(simulation_id: str, user_id: int):
"""Remove an agent from the simulation (Fase A only)."""
try:
manager = SimulationManager()
try:
manager.delete_agent_profile(simulation_id, user_id)
except ValueError as e:
return jsonify({"success": False, "error": str(e)}), 404
except PermissionError as e:
return jsonify({"success": False, "error": str(e)}), 403
except LookupError as e:
return jsonify({"success": False, "error": str(e)}), 404
except FileNotFoundError as e:
return jsonify({"success": False, "error": str(e)}), 404
return jsonify({"success": True, "data": {"deleted_user_id": user_id}})
except Exception as e:
logger.error(f"delete_agent failed: {e}")
return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500
@simulation_bp.route('/<simulation_id>/generate-config', methods=['POST'])
def generate_config_endpoint(simulation_id: str):
"""
Transition from Fase A to Fase B.
Requires status=profiles_ready. Changes to configuring, starts async config generation.
Returns task_id for polling.
"""
import threading
from ..models.task import TaskManager, TaskStatus
from ..services.simulation_config_generator import SimulationConfigGenerator
try:
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if not state:
return jsonify({"success": False, "error": t('api.simulationNotFound', id=simulation_id)}), 404
if state.status != SimulationStatus.PROFILES_READY:
return jsonify({
"success": False,
"error": t('api.requireProfilesReady', status=state.status.value)
}), 400
project = ProjectManager.get_project(state.project_id)
if not project:
return jsonify({"success": False, "error": t('api.projectNotFound', id=state.project_id)}), 404
simulation_requirement = project.get("simulation_requirement") or ""
document_text = ProjectManager.get_extracted_text(state.project_id, get_storage()) or ""
task_manager = TaskManager()
task_id = task_manager.create_task(
task_type="generate_config",
metadata={"simulation_id": simulation_id}
)
state.status = SimulationStatus.CONFIGURING
manager._save_simulation_state(state)
current_locale = get_locale()
def run_generate_config():
set_locale(current_locale)
try:
task_manager.update_task(task_id, status=TaskStatus.PROCESSING, progress=0,
message=t('progress.generatingSimConfig'))
sim_dir = manager._get_simulation_dir(simulation_id)
profiles_file = os.path.join(sim_dir, "reddit_profiles.json")
with open(profiles_file, 'r', encoding='utf-8') as f:
profiles = json.load(f)
from ..services.zep_entity_reader import ZepEntityReader
entity_nodes = []
reader = ZepEntityReader()
for p in profiles:
uuid_ = p.get("source_entity_uuid")
if uuid_:
try:
entity = reader.get_entity_with_context(state.graph_id, uuid_)
if entity:
entity_nodes.append(entity)
except Exception:
pass
gen = SimulationConfigGenerator(graph_id=state.graph_id)
params = gen.generate_simulation_parameters(
simulation_requirement=simulation_requirement,
document_text=document_text,
entities=entity_nodes,
)
config_data = params.to_dict() if hasattr(params, 'to_dict') else {}
config_file = os.path.join(sim_dir, "simulation_config.json")
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config_data, f, ensure_ascii=False, indent=2)
state2 = manager.get_simulation(simulation_id)
if state2:
state2.status = SimulationStatus.READY
state2.config_generated = True
manager._save_simulation_state(state2)
task_manager.complete_task(task_id, result={"status": "prepared"})
except Exception as e:
logger.error(f"generate_config background failed: {e}")
task_manager.fail_task(task_id, str(e))
state2 = manager.get_simulation(simulation_id)
if state2:
state2.status = SimulationStatus.PROFILES_READY
manager._save_simulation_state(state2)
threading.Thread(target=run_generate_config, daemon=True).start()
return jsonify({"success": True, "data": {"simulation_id": simulation_id, "task_id": task_id}})
except Exception as e:
logger.error(f"generate_config endpoint error: {e}")
return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500
@simulation_bp.route('/<simulation_id>/config', methods=['PATCH'])
def patch_simulation_config_endpoint(simulation_id: str):
"""Update simulation global config parameters (Fase B)."""
try:
fields = request.get_json() or {}
if not fields:
return jsonify({"success": False, "error": t('api.requireFields')}), 400
manager = SimulationManager()
try:
updated = manager.patch_simulation_config(simulation_id, fields)
except ValueError as e:
return jsonify({"success": False, "error": str(e)}), 404
except PermissionError as e:
return jsonify({"success": False, "error": str(e)}), 403
except FileNotFoundError as e:
return jsonify({"success": False, "error": str(e)}), 404
return jsonify({"success": True, "data": updated})
except Exception as e:
logger.error(f"patch_simulation_config failed: {e}")
return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500
@simulation_bp.route('/<simulation_id>/clone', methods=['POST'])
def clone_simulation(simulation_id: str):
"""Clone a simulation: copy agent profiles, set status=profiles_ready."""
try:
data = request.get_json() or {}
project_id = data.get('project_id')
if not project_id:
return jsonify({"success": False, "error": "project_id is required"}), 400
manager = SimulationManager()
try:
new_state = manager.clone_simulation(simulation_id, project_id)
except LookupError as e:
return jsonify({"success": False, "error": str(e)}), 404
except ValueError as e:
return jsonify({"success": False, "error": str(e)}), 400
return jsonify({
"success": True,
"data": {
"new_simulation_id": new_state.simulation_id,
"parent_simulation_id": simulation_id,
"status": new_state.status.value,
}
})
except Exception as e:
logger.error(f"clone_simulation failed: {e}")
return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500
@simulation_bp.route('/<simulation_id>/agent', methods=['POST'])
def create_agent(simulation_id: str):
"""
Task 8 — Create a new agent from an entity UUID.
Requires status in {profiles_ready, created}.
Runs async; returns task_id for polling.
"""
import json
import threading
from ..models.task import TaskManager, TaskStatus
try:
data = request.get_json() or {}
source_entity_uuid = data.get("source_entity_uuid")
if not source_entity_uuid:
return jsonify({"success": False, "error": t('api.requireFields')}), 400
extra_instructions = data.get("extra_instructions")
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if not state:
return jsonify({"success": False, "error": t('api.simulationNotFound', id=simulation_id)}), 404
allowed_statuses = {SimulationStatus.PROFILES_READY, SimulationStatus.CREATED}
if state.status not in allowed_statuses:
return jsonify({
"success": False,
"error": t('api.requireProfilesReady', status=state.status.value)
}), 400
task_manager = TaskManager()
task_id = task_manager.create_task(
task_type="create_agent",
metadata={"simulation_id": simulation_id, "source_entity_uuid": source_entity_uuid}
)
current_locale = get_locale()
def run_create_agent():
set_locale(current_locale)
try:
task_manager.update_task(task_id, status=TaskStatus.PROCESSING, progress=0,
message=t('progress.generatingProfile'))
sim_dir = manager._get_simulation_dir(simulation_id)
profiles_file = os.path.join(sim_dir, "reddit_profiles.json")
# Fetch entity
reader = ZepEntityReader()
entity = reader.get_entity_with_context(state.graph_id, source_entity_uuid)
if not entity:
raise ValueError(f"Entity not found: {source_entity_uuid}")
# Read current profiles
with open(profiles_file, 'r', encoding='utf-8') as f:
profiles = json.load(f)
# Check for duplicate source_entity_uuid
for p in profiles:
if p.get("source_entity_uuid") == source_entity_uuid:
raise ValueError(f"Agent with source_entity_uuid '{source_entity_uuid}' already exists")
# Compute next user_id
next_user_id = max((p.get("user_id", -1) for p in profiles), default=-1) + 1
# Generate profile
gen = OasisProfileGenerator(graph_id=state.graph_id)
profile = gen.generate_profile_from_entity(entity, extra_instructions=extra_instructions)
profile.user_id = next_user_id
# Convert to dict and append
profile_dict = profile.to_reddit_format()
profile_dict["user_id"] = next_user_id
profile_dict["source_entity_uuid"] = profile.source_entity_uuid
profile_dict["source_entity_type"] = profile.source_entity_type
profile_dict["manually_edited"] = False
profiles.append(profile_dict)
# Atomic write: backup → write → delete backup
backup_file = profiles_file + ".bak"
if os.path.exists(profiles_file):
import shutil
shutil.copy2(profiles_file, backup_file)
try:
with open(profiles_file, 'w', encoding='utf-8') as f:
json.dump(profiles, f, ensure_ascii=False, indent=2)
if os.path.exists(backup_file):
os.remove(backup_file)
except Exception as write_err:
# Restore from backup on failure
if os.path.exists(backup_file):
import shutil
shutil.copy2(backup_file, profiles_file)
raise write_err
# Update profiles_count in state
state2 = manager.get_simulation(simulation_id)
if state2:
state2.profiles_count = len(profiles)
manager._save_simulation_state(state2)
task_manager.complete_task(task_id, result={"user_id": next_user_id})
except Exception as e:
logger.error(f"create_agent background failed: {e}")
task_manager.fail_task(task_id, str(e))
threading.Thread(target=run_create_agent, daemon=True).start()
return jsonify({"success": True, "data": {"simulation_id": simulation_id, "task_id": task_id}})
except Exception as e:
logger.error(f"create_agent endpoint error: {e}")
return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500
@simulation_bp.route('/<simulation_id>/agent/<int:user_id>/regenerate', methods=['POST'])
def regenerate_agent(simulation_id: str, user_id: int):
"""
Task 9 — Regenerate an agent's profile from its source entity.
Requires status == profiles_ready and the agent must have source_entity_uuid.
Runs async; returns task_id for polling.
"""
import json
import threading
from ..models.task import TaskManager, TaskStatus
try:
data = request.get_json() or {}
extra_instructions = data.get("extra_instructions")
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if not state:
return jsonify({"success": False, "error": t('api.simulationNotFound', id=simulation_id)}), 404
if state.status != SimulationStatus.PROFILES_READY:
return jsonify({
"success": False,
"error": t('api.requireProfilesReady', status=state.status.value)
}), 400
# Validate agent synchronously before creating the task
sim_dir = manager._get_simulation_dir(simulation_id)
profiles_file = os.path.join(sim_dir, "reddit_profiles.json")
if not os.path.exists(profiles_file):
return jsonify({"success": False, "error": t('api.simulationNotFound', id=simulation_id)}), 404
with open(profiles_file, 'r', encoding='utf-8') as f:
profiles_snapshot = json.load(f)
agent_entry = next((p for p in profiles_snapshot if p.get("user_id") == user_id), None)
if agent_entry is None:
return jsonify({"success": False, "error": t('api.agentNotFound', user_id=user_id)}), 404
source_entity_uuid = agent_entry.get("source_entity_uuid")
if not source_entity_uuid:
return jsonify({"success": False, "error": t('api.agentNoSourceEntity')}), 400
task_manager = TaskManager()
task_id = task_manager.create_task(
task_type="regenerate_agent",
metadata={"simulation_id": simulation_id, "user_id": user_id}
)
current_locale = get_locale()
def run_regenerate_agent():
set_locale(current_locale)
try:
task_manager.update_task(task_id, status=TaskStatus.PROCESSING, progress=0,
message=t('progress.generatingProfile'))
with open(profiles_file, 'r', encoding='utf-8') as f:
profiles = json.load(f)
# Find the agent (re-read to get latest state)
agent_idx = None
agent = None
for i, p in enumerate(profiles):
if p.get("user_id") == user_id:
agent_idx = i
agent = p
break
if agent_idx is None:
raise LookupError(f"Agent with user_id {user_id} not found")
source_entity_uuid = agent.get("source_entity_uuid")
if not source_entity_uuid:
raise ValueError(f"Agent {user_id} has no source_entity_uuid — cannot regenerate")
# Fetch entity
reader = ZepEntityReader()
entity = reader.get_entity_with_context(state.graph_id, source_entity_uuid)
if not entity:
raise ValueError(f"Entity not found: {source_entity_uuid}")
# Generate new profile
gen = OasisProfileGenerator(graph_id=state.graph_id)
new_profile = gen.generate_profile_from_entity(entity, extra_instructions=extra_instructions)
new_profile.user_id = user_id
# Build dict, preserving user_id and resetting manually_edited
new_profile_dict = new_profile.to_reddit_format()
new_profile_dict["user_id"] = user_id
new_profile_dict["source_entity_uuid"] = new_profile.source_entity_uuid
new_profile_dict["source_entity_type"] = new_profile.source_entity_type
new_profile_dict["manually_edited"] = False
profiles[agent_idx] = new_profile_dict
# Atomic write
backup_file = profiles_file + ".bak"
if os.path.exists(profiles_file):
import shutil
shutil.copy2(profiles_file, backup_file)
try:
with open(profiles_file, 'w', encoding='utf-8') as f:
json.dump(profiles, f, ensure_ascii=False, indent=2)
if os.path.exists(backup_file):
os.remove(backup_file)
except Exception as write_err:
if os.path.exists(backup_file):
import shutil
shutil.copy2(backup_file, profiles_file)
raise write_err
task_manager.complete_task(task_id, result={"user_id": user_id})
except Exception as e:
logger.error(f"regenerate_agent background failed: {e}")
task_manager.fail_task(task_id, str(e))
threading.Thread(target=run_regenerate_agent, daemon=True).start()
return jsonify({"success": True, "data": {"simulation_id": simulation_id, "task_id": task_id}})
except Exception as e:
logger.error(f"regenerate_agent endpoint error: {e}")
return jsonify({"success": False, "error": str(e), "traceback": traceback.format_exc()}), 500