feat: add Private Impact simulation mode

- New simulation mode for private decision impact in closed relational networks
- RelationalAgentProfile extending OasisAgentProfile with 8 relational fields
- run_private_simulation.py — standalone engine without social platform
- PrivateImpactProfileGenerator, PrivateImpactConfigGenerator, PrivateImpactRunner
- Blueprint /api/private-impact with 7 endpoints
- ModeSelector.vue + PrivateImpactView.vue (5-step wizard)
- 6 surgical extensions in simulation_runner.py
- Zero breaking changes to existing public mode
This commit is contained in:
Cyril 2026-04-16 15:04:22 +02:00
parent fa0f6519b1
commit 89c436411b
23 changed files with 6052 additions and 91 deletions

View File

@ -1,23 +0,0 @@
.git
.github
.gitignore
.cursor
.DS_Store
.env
node_modules
frontend/node_modules
backend/.venv
.venv
.python-version
__pycache__
*.pyc
.pytest_cache
.mypy_cache
.ruff_cache
frontend/dist
frontend/.vite
backend/uploads

131
CONTEXT.md Normal file
View File

@ -0,0 +1,131 @@
# MiroFish — Fork Context (Private Impact Feature)
## Branche de travail
`feature/private-impact`
## Remotes
- `origin` → https://github.com/CyrilDEVIA/MiroResult.git (fork perso)
- `upstream` → https://github.com/666ghj/MiroFish.git (repo original)
---
## Historique des sessions
### 2026-04-16 — Session 1
#### Étapes terminées
- [x] **Prompt N°01** — Lecture complète du code source (audit, zéro modification)
- [x] **Prompt N°02** — Setup Git : fork + remote + branche `feature/private-impact`
- [x] **Prompt N°03** — Création `backend/scripts/run_private_simulation.py`
- [x] **Prompt N°04** — Création `backend/app/services/private_impact_profile_generator.py`
- [x] **Prompt N°05** — Création `backend/app/services/private_impact_config_generator.py`
- [x] **Prompt N°06** — Création `backend/app/services/private_impact_runner.py`
- [x] **Prompt N°07** — Blueprint `backend/app/api/private.py` + enregistrement (`api/__init__.py`, `app/__init__.py`)
- [x] **Prompt N°08** — Modification `backend/app/services/simulation_runner.py` (7 zones : champs private_*, start, monitor, read_log, check_completed, get_actions, cleanup)
- [x] **Prompt N°09** — Frontend : `api/private.js` + `ModeSelector.vue` + `PrivateImpactView.vue` + route `/private/:projectId`
- [x] **Prompt N°10**`action_logger.py` : ajout `get_private_logger()` + suppression fallback `run_private_simulation.py` + intégration `ModeSelector` dans `Home.vue`
#### Fichiers créés / modifiés
| Fichier | Action |
|---|---|
| `backend/scripts/run_private_simulation.py` | Créé — moteur de simulation privé |
| `backend/scripts/private/` | Créé — répertoire de sortie actions.jsonl |
| `backend/app/services/private_impact_profile_generator.py` | Créé — générateur de profils relationnels |
| `backend/app/services/private_impact_config_generator.py` | Créé — générateur de paramètres comportementaux |
| `backend/app/services/private_impact_runner.py` | Créé — orchestrateur subprocess + monitoring |
| `CONTEXT.md` | Créé — ce fichier |
| `backend/app/api/private.py` | Créé — blueprint Flask /api/private-impact (7 routes) |
| `backend/app/api/__init__.py` | Modifié — ajout private_bp + import private |
| `backend/app/__init__.py` | Modifié — enregistrement private_bp |
| `backend/app/services/simulation_runner.py` | Modifié — 7 zones private (champs, start, monitor, read_log, check, get_actions, cleanup) |
| `frontend/src/api/private.js` | Créé — client API private impact (7 fonctions) |
| `frontend/src/components/ModeSelector.vue` | Créé — sélecteur Public / Private Impact (2 cartes) |
| `frontend/src/views/PrivateImpactView.vue` | Créé — wizard 5 étapes (form → prepare → run → report → chat) |
| `frontend/src/router/index.js` | Modifié — route `/private/:projectId` ajoutée |
| `backend/scripts/action_logger.py` | Modifié — ajout `get_private_logger()` à `SimulationLogManager` |
| `backend/scripts/run_private_simulation.py` | Modifié — suppression fallback `hasattr`, appel direct `log_manager.get_private_logger()` |
| `frontend/src/views/Home.vue` | Modifié — intégration `ModeSelector` (right panel) + `handleModeSelected` + sessionStorage `pendingSimMode` |
#### Décisions d'architecture prises
- Pas d'env OASIS (pas de Twitter/Reddit/PlatformConfig)
- Appels LLM directs via `camel-ai ChatAgent` + `asyncio.to_thread()`
- Graphe relationnel construit depuis `cascade_influence` dans agent_configs
- `REACT_PRIVATELY` = invisible → ne propage pas l'exposition
- Tous les autres actions (sauf `DO_NOTHING`) cascade vers `cascade_influence` targets
- `zep_graph_memory_updater.py` réutilisé sans modification (platform="private")
- IPC `PrivateIPCHandler` : interviews via LLM direct (pas de SQLite)
- Output : `private/actions.jsonl` (même format JSONL que twitter/reddit)
- `RelationalAgentProfile` hérite de `OasisAgentProfile` — 8 champs relationnels ajoutés
- Encodage des dimensions relationnelles dans le champ `persona` (texte naturel)
- Fallback rule-based par type : Employee, Manager, Client, Competitor, Partner, FamilyMember
- `to_private_format()` retourne le dict lu par `run_private_simulation.py`
- `PrivateImpactConfigGenerator.generate_config()` : entrée = liste de dicts agents (issue de profile_generator), pas d'EntityNode direct
- `PrivateTimeConfig` : jours + rounds/jour (matin/midi/soir) — pas d'heures ni timezone
- `PrivateEventConfig` : injection par `decision_statement` — pas de posts sociaux
- `RelationalActivityConfig.exposure_round` : round 0 = exposition directe (distance 1)
- Fallback rule-based : table `RELATIONAL_FALLBACKS` dans le générateur (6 types)
- `PrivateImpactRunner` : même pattern classmethods que `SimulationRunner` (états en mémoire de classe)
- Config lue depuis `private_simulation_config.json` (≠ `simulation_config.json` OASIS)
- Log unique : `{sim_dir}/private/actions.jsonl` (une seule plateforme)
- `PrivateRunnerStatus` : enum séparé — pas de réutilisation de `RunnerStatus`
- `private_simulated_days` lu depuis le champ `simulated_day` du `round_end` event
- Frontend : CSS plain (pas Tailwind — non présent dans package.json) — même style que les vues existantes
- `ModeSelector.vue` : composant standalone, émet `@mode-selected` avec `"public"` ou `"private"`, à intégrer manuellement dans `Home.vue` ou `Process.vue`
- `PrivateImpactView.vue` : route `/private/:projectId` — charge le projet via `getProject()` pour récupérer `graph_id`
- Step 3 : polling `/api/private-impact/status/{simId}` toutes les 3s + affichage `recent_actions` depuis `to_detail_dict()`
- Step 4 : report via `generatePrivateReport()` → task_id → polling `getReportStatus(reportId)``getReport(reportId)` (réutilise le ReportManager existant)
- Step 5 : `chatAgents` reconstruit depuis la liste d'actions (agent_id + agent_name) ; chat via `interviewAgents()` (réutilise simulation.js)
- `SimulationRunState` : 5 champs `private_*` ajoutés (current_round, simulated_days, running, actions_count, completed)
- `add_action()` : elif platform=="private" → private_actions_count (évite comptage dans reddit)
- `to_dict()` : private_* inclus dans total_actions_count
- `start_simulation()` : elif platform=="private" → run_private_simulation.py + private_running=True
- `_monitor_simulation()` : lecture `private/actions.jsonl` dans la boucle ET en final ; private_running=False à la fin
- `_read_action_log()` : simulation_end → private_completed=True ; round_end → private_current_round + private_simulated_days (depuis simulated_day)
- `_check_all_platforms_completed()` : private_log + private_enabled + check private_completed
- `get_all_actions()` : bloc private après reddit (même pattern)
- `cleanup_simulation_logs()` : private_simulation.db + dirs_to_clean inclut "private"
- Blueprint `private_bp` enregistré sans url_prefix (les routes déclarent `/api/private-impact/...` en entier)
- `/prepare` stocke les métadonnées (graph_id, sim_requirement, agent_count…) dans `private_meta.json` dans le sim_dir
- `/prepare` appelle `ZepEntityReader.get_entities_by_type()` en boucle sur les types relationnels puis `PrivateImpactProfileGenerator.generate_profiles_from_entities()`
- `/start` lit `private_simulation_config.json` via `PrivateImpactRunner.start_simulation()`
- `/status` retourne `to_detail_dict()` (inclut `recent_actions`)
- `/report` réutilise `ReportAgent` avec `simulation_id=sim_id` et `graph_id` lu depuis `private_meta.json`
- `/cleanup` délègue entièrement à `PrivateImpactRunner.cleanup()`
- `get_private_logger()` ajouté à `SimulationLogManager` — même pattern que `get_twitter_logger()` / `get_reddit_logger()` ; fallback supprimé dans `run_private_simulation.py`
- `ModeSelector` intégré dans `Home.vue` (right panel, au-dessus de `.console-box`) ; mode stocké dans `sessionStorage` (`pendingSimMode`) — `MainView.vue` (N°11) doit lire ce flag et rediriger vers `/private/:projectId` après création du projet
---
## Prochaines étapes
| Prompt | Fichier cible | Action |
|---|---|---|
| N°04 | `backend/app/services/private_impact_profile_generator.py` | ✅ Terminé |
| N°05 | `backend/app/services/private_impact_config_generator.py` | ✅ Terminé |
| N°06 | `backend/app/services/private_impact_runner.py` | ✅ Terminé |
| N°07 | `backend/app/api/private.py` | ✅ Terminé |
| N°07 | `backend/app/api/__init__.py` | ✅ Terminé |
| N°07 | `backend/app/__init__.py` | ✅ Terminé |
| N°08 | `backend/app/services/simulation_runner.py` | ✅ Terminé |
| N°09 | `frontend/src/views/PrivateImpactView.vue` | ✅ Terminé |
| N°09 | `frontend/src/components/ModeSelector.vue` | ✅ Terminé |
| N°10 | `backend/scripts/action_logger.py` | ✅ Terminé — `get_private_logger()` ajouté |
| N°10 | `backend/scripts/run_private_simulation.py` | ✅ Terminé — fallback supprimé |
| N°10 | `frontend/src/views/Home.vue` | ✅ Terminé — ModeSelector intégré |
| N°11 | `frontend/src/views/MainView.vue` | Lire `sessionStorage.pendingSimMode` après création du projet → rediriger vers `/private/:projectId` si 'private' |
| N°11 | Test end-to-end | Préparer → Lancer → Observer actions.jsonl |
## Point d'attention — `MainView.vue` (N°11)
**Status : ⏳ PENDING**
`Home.vue` stocke `sessionStorage.pendingSimMode = 'private'` quand l'utilisateur sélectionne Private Impact.
`MainView.vue` doit être modifié pour lire ce flag après la création du projet + le build du graphe Zep, et rediriger vers `/private/:projectId` au lieu de rester sur la vue OASIS standard.
**Action requise** dans `MainView.vue` — après la séquence upload → create_project → build_graph :
```javascript
const pendingMode = sessionStorage.getItem('pendingSimMode')
if (pendingMode === 'private') {
sessionStorage.removeItem('pendingSimMode')
router.push(`/private/${projectId}`)
}
```

View File

@ -1,29 +0,0 @@
FROM python:3.11
# 安装 Node.js (满足 >=18及必要工具
RUN apt-get update \
&& apt-get install -y --no-install-recommends nodejs npm \
&& rm -rf /var/lib/apt/lists/*
# 从 uv 官方镜像复制 uv
COPY --from=ghcr.io/astral-sh/uv:0.9.26 /uv /uvx /bin/
WORKDIR /app
# 先复制依赖描述文件以利用缓存
COPY package.json package-lock.json ./
COPY frontend/package.json frontend/package-lock.json ./frontend/
COPY backend/pyproject.toml backend/uv.lock ./backend/
# 安装依赖Node + Python
RUN npm ci \
&& npm ci --prefix frontend \
&& cd backend && uv sync --frozen
# 复制项目源码
COPY . .
EXPOSE 3000 5001
# 同时启动前后端(开发模式)
CMD ["npm", "run", "dev"]

View File

@ -67,6 +67,8 @@ def create_app(config_class=Config):
app.register_blueprint(graph_bp, url_prefix='/api/graph')
app.register_blueprint(simulation_bp, url_prefix='/api/simulation')
app.register_blueprint(report_bp, url_prefix='/api/report')
from .api import private_bp
app.register_blueprint(private_bp)
# 健康检查
@app.route('/health')

View File

@ -7,8 +7,10 @@ from flask import Blueprint
graph_bp = Blueprint('graph', __name__)
simulation_bp = Blueprint('simulation', __name__)
report_bp = Blueprint('report', __name__)
private_bp = Blueprint('private', __name__)
from . import graph # noqa: E402, F401
from . import simulation # noqa: E402, F401
from . import report # noqa: E402, F401
from . import private # noqa: E402, F401

558
backend/app/api/private.py Normal file
View File

@ -0,0 +1,558 @@
"""
Private Impact API routes.
Exposes the /api/private-impact endpoints for the Private Impact simulation mode.
Follows the same error handling and JSON response format as graph/simulation/report blueprints.
"""
import json
import os
import traceback
import threading
import uuid
from datetime import datetime
from flask import request, jsonify
from . import private_bp
from ..config import Config
from ..services.private_impact_profile_generator import PrivateImpactProfileGenerator
from ..services.private_impact_config_generator import PrivateImpactConfigGenerator
from ..services.private_impact_runner import PrivateImpactRunner
from ..services.zep_entity_reader import ZepEntityReader
from ..services.report_agent import ReportAgent, ReportManager, ReportStatus
from ..models.task import TaskManager, TaskStatus
from ..models.project import ProjectManager
from ..utils.logger import get_logger
from ..utils.locale import t, get_locale, set_locale
logger = get_logger('mirofish.api.private')
# Simulation data directory (same root as PrivateImpactRunner.RUN_STATE_DIR)
_SIM_DIR = os.path.join(os.path.dirname(__file__), '../../uploads/simulations')
# Relational entity types recognised by PrivateImpactProfileGenerator
_RELATIONAL_ENTITY_TYPES = [
"employee", "manager", "client", "competitor",
"partner", "familymember", "colleague", "investor",
]
# ── Helpers ────────────────────────────────────────────────────────────────────
def _sim_dir(sim_id: str) -> str:
"""Return absolute path to the simulation directory."""
return os.path.join(_SIM_DIR, sim_id)
def _meta_path(sim_id: str) -> str:
return os.path.join(_sim_dir(sim_id), "private_meta.json")
def _read_meta(sim_id: str) -> dict:
"""Load private_meta.json; return empty dict if missing."""
path = _meta_path(sim_id)
if not os.path.exists(path):
return {}
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
def _write_meta(sim_id: str, meta: dict) -> None:
"""Persist private_meta.json to disk."""
os.makedirs(_sim_dir(sim_id), exist_ok=True)
with open(_meta_path(sim_id), 'w', encoding='utf-8') as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
# ── POST /api/private-impact/prepare ──────────────────────────────────────────
@private_bp.route('/private-impact/prepare', methods=['POST'])
def prepare_private_simulation():
"""
Prepare a Private Impact simulation.
Reads relational entities from the Zep graph, generates RelationalAgentProfile
instances via PrivateImpactProfileGenerator, then generates the full
PrivateSimulationParameters via PrivateImpactConfigGenerator.
Saves private_agents.json and private_simulation_config.json in the sim dir.
Request (JSON):
{
"graph_id": "mirofish_xxxx", // Required (or project_id)
"project_id": "proj_xxxx", // Optional used to resolve graph_id / sim_requirement
"simulation_requirement": "...", // Required if no project_id
"decision_context": "...", // Optional
"use_llm": true, // Optional, default true
"entity_types": ["employee", ...], // Optional filter (defaults to all relational types)
"sim_id": "private_xxxx" // Optional reuse an existing sim_id
}
Returns:
{ "success": true, "data": { "sim_id": "...", "agent_count": N, "status": "prepared" } }
"""
try:
data = request.get_json() or {}
# Resolve graph_id and simulation_requirement
project_id = data.get('project_id')
graph_id = data.get('graph_id')
simulation_requirement = data.get('simulation_requirement', '')
decision_context = data.get('decision_context', '')
if project_id:
project = ProjectManager.get_project(project_id)
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=project_id)
}), 404
graph_id = graph_id or project.graph_id
simulation_requirement = simulation_requirement or project.simulation_requirement or ''
if not graph_id:
return jsonify({
"success": False,
"error": "graph_id is required"
}), 400
if not simulation_requirement:
return jsonify({
"success": False,
"error": "simulation_requirement is required"
}), 400
if not Config.ZEP_API_KEY:
return jsonify({
"success": False,
"error": t('api.zepApiKeyMissing')
}), 500
# Create or reuse sim_id
sim_id = data.get('sim_id') or f"private_{uuid.uuid4().hex[:12]}"
os.makedirs(_sim_dir(sim_id), exist_ok=True)
use_llm = data.get('use_llm', True)
entity_types = data.get('entity_types') or _RELATIONAL_ENTITY_TYPES
# Read relational entities from Zep
reader = ZepEntityReader()
all_entities = []
for etype in entity_types:
try:
found = reader.get_entities_by_type(
graph_id=graph_id,
entity_type=etype,
enrich_with_edges=True,
)
all_entities.extend(found)
logger.info(f"[PRIVATE] {len(found)} '{etype}' entities read")
except Exception as e:
logger.warning(f"[PRIVATE] Could not read '{etype}' entities: {e}")
if not all_entities:
return jsonify({
"success": False,
"error": "No relational entities found in the graph for the given entity types."
}), 404
# Generate RelationalAgentProfile instances
profile_generator = PrivateImpactProfileGenerator()
profiles_path = os.path.join(_sim_dir(sim_id), "private_agents.json")
profiles = profile_generator.generate_profiles_from_entities(
entities=all_entities,
use_llm=use_llm,
graph_id=graph_id,
realtime_output_path=profiles_path,
)
# Serialize profiles for config generator
agent_dicts = [p.to_private_format() for p in profiles]
# Save profiles file (final)
with open(profiles_path, 'w', encoding='utf-8') as f:
json.dump(agent_dicts, f, ensure_ascii=False, indent=2)
# Generate PrivateSimulationParameters
config_generator = PrivateImpactConfigGenerator()
params = config_generator.generate_config(
agent_profiles=agent_dicts,
simulation_requirement=simulation_requirement,
decision_context=decision_context,
)
# Save private_simulation_config.json (consumed by PrivateImpactRunner)
config_path = os.path.join(_sim_dir(sim_id), "private_simulation_config.json")
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(params.to_dict(), f, ensure_ascii=False, indent=2)
# Persist metadata for subsequent endpoints
_write_meta(sim_id, {
"sim_id": sim_id,
"project_id": project_id,
"graph_id": graph_id,
"simulation_requirement": simulation_requirement,
"decision_context": decision_context,
"agent_count": len(profiles),
"created_at": datetime.now().isoformat(),
"status": "prepared",
})
logger.info(
f"[PRIVATE] Simulation prepared: {sim_id}, "
f"agents={len(profiles)}, graph_id={graph_id}"
)
return jsonify({
"success": True,
"data": {
"sim_id": sim_id,
"agent_count": len(profiles),
"status": "prepared",
}
})
except Exception as e:
logger.error(f"[PRIVATE] Prepare failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ── POST /api/private-impact/start ────────────────────────────────────────────
@private_bp.route('/private-impact/start', methods=['POST'])
def start_private_simulation():
"""
Launch a prepared Private Impact simulation.
Request (JSON):
{
"sim_id": "private_xxxx", // Required
"max_rounds": null, // Optional
"enable_graph_memory_update": false, // Optional
"graph_id": null // Optional (required if enable_graph_memory_update)
}
Returns:
{ "success": true, "data": { "sim_id": "...", "status": "running" } }
"""
try:
data = request.get_json() or {}
sim_id = data.get('sim_id')
if not sim_id:
return jsonify({
"success": False,
"error": "sim_id is required"
}), 400
max_rounds = data.get('max_rounds')
enable_graph_memory = data.get('enable_graph_memory_update', False)
graph_id = data.get('graph_id')
# Resolve graph_id from meta if needed
if enable_graph_memory and not graph_id:
meta = _read_meta(sim_id)
graph_id = meta.get('graph_id')
state = PrivateImpactRunner.start_simulation(
simulation_id=sim_id,
max_rounds=max_rounds,
enable_graph_memory_update=enable_graph_memory,
graph_id=graph_id,
)
return jsonify({
"success": True,
"data": {
"sim_id": sim_id,
"status": state.runner_status.value,
}
})
except ValueError as e:
return jsonify({
"success": False,
"error": str(e)
}), 400
except Exception as e:
logger.error(f"[PRIVATE] Start failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ── GET /api/private-impact/status/<sim_id> ───────────────────────────────────
@private_bp.route('/private-impact/status/<sim_id>', methods=['GET'])
def get_private_status(sim_id: str):
"""
Return the current run state of a Private Impact simulation.
Returns:
{ "success": true, "data": PrivateSimulationRunState.to_dict() }
"""
try:
state = PrivateImpactRunner.get_status(sim_id)
if not state:
return jsonify({
"success": False,
"error": f"No private simulation found for sim_id: {sim_id}"
}), 404
return jsonify({
"success": True,
"data": state.to_detail_dict()
})
except Exception as e:
logger.error(f"[PRIVATE] Get status failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ── POST /api/private-impact/stop/<sim_id> ────────────────────────────────────
@private_bp.route('/private-impact/stop/<sim_id>', methods=['POST'])
def stop_private_simulation(sim_id: str):
"""
Stop a running Private Impact simulation.
Returns:
{ "success": true, "data": { "sim_id": "...", "status": "stopped" } }
"""
try:
state = PrivateImpactRunner.stop_simulation(sim_id)
return jsonify({
"success": True,
"data": {
"sim_id": sim_id,
"status": state.runner_status.value,
}
})
except ValueError as e:
return jsonify({
"success": False,
"error": str(e)
}), 400
except Exception as e:
logger.error(f"[PRIVATE] Stop failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ── GET /api/private-impact/actions/<sim_id> ──────────────────────────────────
@private_bp.route('/private-impact/actions/<sim_id>', methods=['GET'])
def get_private_actions(sim_id: str):
"""
Return the full private action log for a simulation.
Query params:
agent_id: Filter by agent ID (optional, int)
round_num: Filter by round number (optional, int)
Returns:
{ "success": true, "data": [...], "count": N }
"""
try:
agent_id_raw = request.args.get('agent_id')
round_num_raw = request.args.get('round_num')
agent_id = int(agent_id_raw) if agent_id_raw is not None else None
round_num = int(round_num_raw) if round_num_raw is not None else None
actions = PrivateImpactRunner.get_all_actions(
simulation_id=sim_id,
agent_id=agent_id,
round_num=round_num,
)
return jsonify({
"success": True,
"data": [a.to_dict() for a in actions],
"count": len(actions)
})
except Exception as e:
logger.error(f"[PRIVATE] Get actions failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ── POST /api/private-impact/report/<sim_id> ──────────────────────────────────
@private_bp.route('/private-impact/report/<sim_id>', methods=['POST'])
def generate_private_report(sim_id: str):
"""
Generate an analysis report for a Private Impact simulation.
Reuses ReportAgent from report_agent.py with the private simulation actions.
Launches the generation in a background thread and returns a task_id immediately.
Request (JSON):
{ "force_regenerate": false } // Optional
Returns:
{ "success": true, "data": { "sim_id": "...", "report_id": "...", "task_id": "..." } }
"""
try:
data = request.get_json() or {}
force_regenerate = data.get('force_regenerate', False)
meta = _read_meta(sim_id)
graph_id = meta.get('graph_id')
simulation_requirement = meta.get('simulation_requirement', '')
if not graph_id:
return jsonify({
"success": False,
"error": f"No metadata found for sim_id: {sim_id}. Run /prepare first."
}), 404
# Check for an existing completed report
if not force_regenerate:
existing = ReportManager.get_report_by_simulation(sim_id)
if existing and existing.status == ReportStatus.COMPLETED:
return jsonify({
"success": True,
"data": {
"sim_id": sim_id,
"report_id": existing.report_id,
"status": "completed",
"already_generated": True,
}
})
# Pre-generate report_id so the frontend can track immediately
report_id = f"report_{uuid.uuid4().hex[:12]}"
task_manager = TaskManager()
task_id = task_manager.create_task(
task_type="private_report_generate",
metadata={
"sim_id": sim_id,
"graph_id": graph_id,
"report_id": report_id,
}
)
current_locale = get_locale()
def run_generate():
set_locale(current_locale)
try:
task_manager.update_task(
task_id,
status=TaskStatus.PROCESSING,
progress=0,
message="Initialising Report Agent for private simulation..."
)
agent = ReportAgent(
graph_id=graph_id,
simulation_id=sim_id,
simulation_requirement=simulation_requirement,
)
def progress_callback(stage, progress, message):
task_manager.update_task(
task_id,
progress=progress,
message=f"[{stage}] {message}"
)
report = agent.generate_report(
progress_callback=progress_callback,
report_id=report_id,
)
ReportManager.save_report(report)
if report.status == ReportStatus.COMPLETED:
task_manager.complete_task(
task_id,
result={
"report_id": report.report_id,
"sim_id": sim_id,
"status": "completed",
}
)
else:
task_manager.fail_task(task_id, report.error or "Report generation failed")
except Exception as exc:
logger.error(f"[PRIVATE] Report generation failed: {exc}")
task_manager.fail_task(task_id, str(exc))
thread = threading.Thread(target=run_generate, daemon=True)
thread.start()
return jsonify({
"success": True,
"data": {
"sim_id": sim_id,
"report_id": report_id,
"task_id": task_id,
"status": "generating",
"already_generated": False,
}
})
except Exception as e:
logger.error(f"[PRIVATE] Report trigger failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ── DELETE /api/private-impact/cleanup/<sim_id> ───────────────────────────────
@private_bp.route('/private-impact/cleanup/<sim_id>', methods=['DELETE'])
def cleanup_private_simulation(sim_id: str):
"""
Remove Private Impact simulation artifacts to allow a fresh restart.
Deletes run_state.json, simulation.log, private_simulation.db, private/ directory.
Does NOT delete private_simulation_config.json or profile files.
Returns:
{ "success": true, "data": { "sim_id": "...", "cleaned_files": [...] } }
"""
try:
result = PrivateImpactRunner.cleanup(sim_id)
return jsonify({
"success": result["success"],
"data": {
"sim_id": sim_id,
"cleaned_files": result["cleaned_files"],
"errors": result["errors"],
}
})
except Exception as e:
logger.error(f"[PRIVATE] Cleanup failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500

View File

@ -0,0 +1,735 @@
"""
Private Impact Config Generator
Generates behavioral parameters for RelationalAgents in Private Impact mode.
Equivalent of simulation_config_generator.py for the private relational network.
Key differences from SimulationConfigGenerator:
- No PlatformConfig (no social network)
- Time is measured in days with rounds per day (morning / noon / evening)
- RelationalActivityConfig replaces AgentActivityConfig
- PrivateEventConfig uses a decision statement instead of initial posts
- LLM calls: time config event config agent configs (batches of 15)
"""
import json
import math
from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional
from openai import OpenAI
from ..config import Config
from ..utils.logger import get_logger
from ..utils.locale import get_language_instruction
logger = get_logger('mirofish.private_impact_config')
# ── Dataclasses ───────────────────────────────────────────────────────────────
@dataclass
class RelationalActivityConfig:
"""
Behavioral activity configuration for a single RelationalAgent.
Equivalent of AgentActivityConfig for the private simulation mode.
No posting frequencies private reactions replace social media posts.
"""
agent_id: int
entity_uuid: str
entity_name: str
relational_link_type: str # employee | manager | client | competitor | partner | familymember
# Activity parameters
activity_level: float = 0.5 # Overall engagement level (0.01.0)
response_delay_min: int = 0 # Min reaction delay (days)
response_delay_max: int = 3 # Max reaction delay (days)
# Behavioral stance toward the decision
sentiment_bias: float = 0.0 # -1.0 (hostile) → 1.0 (supportive)
stance: str = "neutral" # supportive | opposing | neutral | observer
# Influence within the relational graph
influence_weight: float = 1.0 # Weight for cascade propagation
# Relational graph: agent_ids this agent can expose to the decision
cascade_influence: List[int] = field(default_factory=list)
# Simulation round at which this agent is first exposed to the decision
exposure_round: int = 0 # 0 = exposed at the very first round
@dataclass
class PrivateTimeConfig:
"""
Time configuration for the Private Impact simulation.
Replaces TimeSimulationConfig (Twitter/hour-based) with a day-based,
relational-rhythm model: days × rounds per day.
"""
total_simulation_days: int = 30 # Total simulated days
rounds_per_day: int = 3 # Morning / noon / evening
reaction_delay_days_min: int = 0 # Min delay before an agent reacts
reaction_delay_days_max: int = 7 # Max delay before an agent reacts
@dataclass
class PrivateEventConfig:
"""
Event configuration for the Private Impact simulation.
Replaces EventConfig (social posts) with a decision injection model.
"""
decision_statement: str = "" # The decision injected into the network
decision_maker_profile: str = "" # Short description of the decision maker
hot_topics: List[str] = field(default_factory=list) # Related sensitive topics
initial_exposed_agent_ids: List[int] = field( # Distance-1 agents (heard it first)
default_factory=list
)
@dataclass
class PrivateSimulationParameters:
"""Complete parameter set for a Private Impact simulation."""
time_config: PrivateTimeConfig = field(default_factory=PrivateTimeConfig)
agent_configs: List[RelationalActivityConfig] = field(default_factory=list)
event_config: PrivateEventConfig = field(default_factory=PrivateEventConfig)
# LLM metadata
llm_model: str = ""
llm_base_url: str = ""
generated_at: str = field(default_factory=lambda: datetime.now().isoformat())
generation_reasoning: str = ""
def to_dict(self) -> Dict[str, Any]:
"""Serialize to plain dict."""
return {
"time_config": asdict(self.time_config),
"agent_configs": [asdict(a) for a in self.agent_configs],
"event_config": asdict(self.event_config),
"llm_model": self.llm_model,
"llm_base_url": self.llm_base_url,
"generated_at": self.generated_at,
"generation_reasoning": self.generation_reasoning,
}
def to_json(self, indent: int = 2) -> str:
"""Serialize to JSON string."""
return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent)
# ── PrivateImpactConfigGenerator ──────────────────────────────────────────────
class PrivateImpactConfigGenerator:
"""
Generates PrivateSimulationParameters for the Private Impact simulation.
Equivalent of SimulationConfigGenerator for the private relational mode.
Uses 3 sequential LLM calls:
1. PrivateTimeConfig relational rhythm (days, rounds)
2. PrivateEventConfig decision injection setup
3. RelationalActivityConfig list batches of AGENTS_PER_BATCH
Falls back to a rule-based table per relational type on LLM failure.
"""
AGENTS_PER_BATCH = 15
# Context length limits (characters)
TIME_CONFIG_CONTEXT_LENGTH = 8000
EVENT_CONFIG_CONTEXT_LENGTH = 6000
AGENT_SUMMARY_LENGTH = 300
# Rule-based fallback table by relational type
# Keys: activity_level, response_delay_min, response_delay_max, stance, influence_weight
RELATIONAL_FALLBACKS: Dict[str, Dict[str, Any]] = {
"employee": {
"activity_level": 0.6,
"response_delay_min": 0,
"response_delay_max": 3,
"sentiment_bias": 0.0,
"stance": "neutral",
"influence_weight": 0.8,
},
"manager": {
"activity_level": 0.5,
"response_delay_min": 0,
"response_delay_max": 1,
"sentiment_bias": 0.0,
"stance": "neutral",
"influence_weight": 1.5,
},
"client": {
"activity_level": 0.3,
"response_delay_min": 2,
"response_delay_max": 7,
"sentiment_bias": 0.0,
"stance": "observer",
"influence_weight": 1.2,
},
"competitor": {
"activity_level": 0.2,
"response_delay_min": 1,
"response_delay_max": 5,
"sentiment_bias": -0.3,
"stance": "opposing",
"influence_weight": 1.0,
},
"partner": {
"activity_level": 0.4,
"response_delay_min": 0,
"response_delay_max": 2,
"sentiment_bias": 0.1,
"stance": "neutral",
"influence_weight": 1.3,
},
"familymember": {
"activity_level": 0.7,
"response_delay_min": 0,
"response_delay_max": 0,
"sentiment_bias": 0.4,
"stance": "supportive",
"influence_weight": 0.9,
},
}
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model_name: Optional[str] = None,
):
self.api_key = api_key or Config.LLM_API_KEY
self.base_url = base_url or Config.LLM_BASE_URL
self.model_name = model_name or Config.LLM_MODEL_NAME
if not self.api_key:
raise ValueError("LLM_API_KEY is not configured")
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url,
)
# ── Public API ─────────────────────────────────────────────────────────────
def generate_config(
self,
agent_profiles: List[Dict[str, Any]],
simulation_requirement: str,
decision_context: str = "",
progress_callback: Optional[Callable[[int, int, str], None]] = None,
) -> PrivateSimulationParameters:
"""
Generate the complete PrivateSimulationParameters.
Performs 3 sequential LLM call groups:
Step 1 PrivateTimeConfig (relational rhythm)
Step 2 PrivateEventConfig (decision injection)
Step 3-N RelationalActivityConfig batches (AGENTS_PER_BATCH each)
Falls back to rule-based generation per step on LLM failure.
Args:
agent_profiles: List of agent dicts from private_impact_profile_generator.
Each dict must include: agent_id, entity_name, relational_link_type,
cascade_influence, source_entity_uuid, persona (optional).
simulation_requirement: Natural language description of the simulation goal.
decision_context: Additional context about the decision or the organization.
progress_callback: Optional callback(current_step, total_steps, message).
Returns:
PrivateSimulationParameters ready for run_private_simulation.py.
"""
num_agents = len(agent_profiles)
num_batches = math.ceil(num_agents / self.AGENTS_PER_BATCH)
total_steps = 2 + num_batches # time + event + N agent batches
current_step = 0
def report(step: int, message: str) -> None:
nonlocal current_step
current_step = step
if progress_callback:
progress_callback(step, total_steps, message)
logger.info(f"[{step}/{total_steps}] {message}")
context = self._build_context(
simulation_requirement=simulation_requirement,
decision_context=decision_context,
agent_profiles=agent_profiles,
)
reasoning_parts: List[str] = []
# ── Step 1: PrivateTimeConfig ─────────────────────────────────────────
report(1, "Generating relational time configuration...")
time_result = self._generate_time_config(context, num_agents)
time_config = self._parse_time_config(time_result)
reasoning_parts.append(f"Time: {time_result.get('reasoning', 'ok')}")
# ── Step 2: PrivateEventConfig ────────────────────────────────────────
report(2, "Generating decision event configuration...")
event_result = self._generate_event_config(context, simulation_requirement, agent_profiles)
event_config = self._parse_event_config(event_result, agent_profiles)
reasoning_parts.append(f"Event: {event_result.get('reasoning', 'ok')}")
# ── Steps 3-N: RelationalActivityConfig batches ───────────────────────
all_agent_configs: List[RelationalActivityConfig] = []
for batch_idx in range(num_batches):
start = batch_idx * self.AGENTS_PER_BATCH
end = min(start + self.AGENTS_PER_BATCH, num_agents)
batch = agent_profiles[start:end]
report(
3 + batch_idx,
f"Generating agent configs {start + 1}{end} / {num_agents}...",
)
batch_configs = self._generate_agent_configs_batch(
context=context,
agent_profiles=batch,
start_idx=start,
simulation_requirement=simulation_requirement,
)
all_agent_configs.extend(batch_configs)
reasoning_parts.append(f"Agents: {len(all_agent_configs)} configured")
return PrivateSimulationParameters(
time_config=time_config,
agent_configs=all_agent_configs,
event_config=event_config,
llm_model=self.model_name,
llm_base_url=self.base_url,
generation_reasoning=" | ".join(reasoning_parts),
)
# ── Context builder ────────────────────────────────────────────────────────
def _build_context(
self,
simulation_requirement: str,
decision_context: str,
agent_profiles: List[Dict[str, Any]],
) -> str:
"""Build the shared LLM context string for all generation steps."""
by_type: Dict[str, int] = {}
for a in agent_profiles:
rtype = a.get("relational_link_type", "unknown")
by_type[rtype] = by_type.get(rtype, 0) + 1
type_summary = "\n".join(
f" - {rtype}: {count}" for rtype, count in sorted(by_type.items())
)
parts = [
f"## Simulation Requirement\n{simulation_requirement}",
f"\n## Relational Network ({len(agent_profiles)} agents)\n{type_summary}",
]
if decision_context:
parts.append(f"\n## Decision Context\n{decision_context[:3000]}")
return "\n".join(parts)
# ── Step 1: Time config ────────────────────────────────────────────────────
def _generate_time_config(
self, context: str, num_agents: int
) -> Dict[str, Any]:
"""Generate PrivateTimeConfig via LLM."""
context_truncated = context[:self.TIME_CONFIG_CONTEXT_LENGTH]
prompt = f"""Based on the following private impact simulation context, generate a relational time configuration.
{context_truncated}
## Task
Generate a time configuration for a private relational network simulation.
Unlike social media, this is a closed human network where decisions propagate
over days through interpersonal channels (conversations, emails, hallway talks).
Return a JSON object (no markdown):
{{
"total_simulation_days": <int, 790, how many days until the network reaches equilibrium>,
"rounds_per_day": <int, 24, typically 3: morning/noon/evening>,
"reaction_delay_days_min": <int, 03>,
"reaction_delay_days_max": <int, 114>,
"reasoning": "<brief explanation>"
}}
Guidelines:
- Major organizational decisions: 3060 days
- Personal or family decisions: 721 days
- Sudden crises: 714 days
- rounds_per_day=3 is the standard (morning / noon / evening)
- reaction_delay_days_max should not exceed total_simulation_days / 4"""
system_prompt = (
"You are an expert in organizational psychology and network dynamics. "
"Return pure JSON only — no markdown. "
f"{get_language_instruction()}"
)
try:
return self._call_llm_with_retry(prompt, system_prompt)
except Exception as e:
logger.warning(f"Time config LLM failed: {e}. Using defaults.")
return self._default_time_config()
def _default_time_config(self) -> Dict[str, Any]:
"""Rule-based default time config."""
return {
"total_simulation_days": 30,
"rounds_per_day": 3,
"reaction_delay_days_min": 0,
"reaction_delay_days_max": 7,
"reasoning": "Default relational time config",
}
def _parse_time_config(self, result: Dict[str, Any]) -> PrivateTimeConfig:
"""Parse and validate PrivateTimeConfig from LLM result."""
total_days = max(7, int(result.get("total_simulation_days", 30)))
rounds_per_day = max(2, min(4, int(result.get("rounds_per_day", 3))))
delay_min = max(0, int(result.get("reaction_delay_days_min", 0)))
delay_max = max(delay_min, int(result.get("reaction_delay_days_max", 7)))
if delay_max >= total_days:
delay_max = max(delay_min + 1, total_days // 4)
logger.warning(f"reaction_delay_days_max capped to {delay_max}")
return PrivateTimeConfig(
total_simulation_days=total_days,
rounds_per_day=rounds_per_day,
reaction_delay_days_min=delay_min,
reaction_delay_days_max=delay_max,
)
# ── Step 2: Event config ───────────────────────────────────────────────────
def _generate_event_config(
self,
context: str,
simulation_requirement: str,
agent_profiles: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Generate PrivateEventConfig via LLM."""
context_truncated = context[:self.EVENT_CONFIG_CONTEXT_LENGTH]
# Build distance-1 candidate list (agents closest to the decision maker)
distance1_candidates = [
{
"agent_id": a.get("agent_id", i),
"name": a.get("entity_name", ""),
"type": a.get("relational_link_type", ""),
}
for i, a in enumerate(agent_profiles)
if a.get("relational_link_type", "") in (
"manager", "employee", "partner", "familymember"
)
][:10]
candidates_json = json.dumps(distance1_candidates, ensure_ascii=False)
prompt = f"""Based on the following private impact simulation context, generate the event configuration.
{context_truncated}
## Distance-1 agent candidates (closest to decision maker)
{candidates_json}
## Task
Generate an event configuration for private impact injection.
Return a JSON object (no markdown):
{{
"decision_statement": "<precise wording of the decision being injected — 24 sentences>",
"decision_maker_profile": "<short description of the person making the decision — role, authority, relationship style>",
"hot_topics": ["<sensitive topic 1>", "<sensitive topic 2>", ...],
"initial_exposed_agent_ids": [<agent_id>, ...],
"reasoning": "<brief explanation>"
}}
Rules:
- decision_statement must be specific and concrete, not vague
- initial_exposed_agent_ids must only contain ids from the distance-1 candidates list above
- initial_exposed_agent_ids should be 13 agents (direct announcement recipients)
- hot_topics: 36 strings describing the sensitive dimensions of this decision
(e.g. "salary equity", "job security", "family impact", "competitive pressure")"""
system_prompt = (
"You are an expert in organizational decision impact simulation. "
"Return pure JSON only — no markdown. "
f"{get_language_instruction()}"
)
try:
return self._call_llm_with_retry(prompt, system_prompt)
except Exception as e:
logger.warning(f"Event config LLM failed: {e}. Using defaults.")
first_id = agent_profiles[0].get("agent_id", 0) if agent_profiles else 0
return {
"decision_statement": simulation_requirement,
"decision_maker_profile": "The decision maker",
"hot_topics": ["organizational change", "impact"],
"initial_exposed_agent_ids": [first_id],
"reasoning": "Default event config (LLM fallback)",
}
def _parse_event_config(
self,
result: Dict[str, Any],
agent_profiles: List[Dict[str, Any]],
) -> PrivateEventConfig:
"""Parse and validate PrivateEventConfig from LLM result."""
valid_ids = {a.get("agent_id", i) for i, a in enumerate(agent_profiles)}
raw_exposed = result.get("initial_exposed_agent_ids", [])
exposed = [aid for aid in raw_exposed if aid in valid_ids]
if not exposed and agent_profiles:
exposed = [agent_profiles[0].get("agent_id", 0)]
return PrivateEventConfig(
decision_statement=result.get("decision_statement", ""),
decision_maker_profile=result.get("decision_maker_profile", ""),
hot_topics=result.get("hot_topics", []),
initial_exposed_agent_ids=exposed,
)
# ── Steps 3-N: Agent config batches ───────────────────────────────────────
def _generate_agent_configs_batch(
self,
context: str,
agent_profiles: List[Dict[str, Any]],
start_idx: int,
simulation_requirement: str,
) -> List[RelationalActivityConfig]:
"""
Generate a batch of RelationalActivityConfig via LLM with rule-based fallback.
Args:
context: Shared simulation context string.
agent_profiles: Slice of agent dicts for this batch.
start_idx: Index of the first agent in the full list (for logging).
simulation_requirement: Natural language simulation goal.
Returns:
List of RelationalActivityConfig for this batch.
"""
agent_list = []
summary_len = self.AGENT_SUMMARY_LENGTH
for i, a in enumerate(agent_profiles):
agent_list.append({
"agent_id": a.get("agent_id", start_idx + i),
"entity_name": a.get("entity_name", ""),
"relational_link_type": a.get("relational_link_type", "peer"),
"cascade_influence": a.get("cascade_influence", []),
"persona_excerpt": (a.get("persona", "") or "")[:summary_len],
})
prompt = f"""Based on the following private impact simulation context, generate activity configurations for each relational agent.
Simulation requirement: {simulation_requirement}
## Agent list
```json
{json.dumps(agent_list, ensure_ascii=False, indent=2)}
```
## Task
Generate a RelationalActivityConfig for each agent.
Behavioral guidelines by relational type:
- employee: activity_level=0.6, response_delay_max=3 days, stance=neutral
- manager: activity_level=0.5, response_delay_max=1 day, stance=neutral
- client: activity_level=0.3, response_delay_max=7 days, stance=observer
- competitor: activity_level=0.2, response_delay_max=5 days, stance=opposing
- partner: activity_level=0.4, response_delay_max=2 days, stance=neutral
- familymember: activity_level=0.7, response_delay_max=0 days, stance=supportive
Return a JSON object (no markdown):
{{
"agent_configs": [
{{
"agent_id": <must match input>,
"activity_level": <0.01.0>,
"response_delay_min": <int, days>,
"response_delay_max": <int, days>,
"sentiment_bias": <-1.0 to 1.0>,
"stance": "<supportive|opposing|neutral|observer>",
"influence_weight": <float>,
"exposure_round": <int, 0 for initial exposed agents>
}},
...
]
}}
Rules:
- agent_id must match the input exactly
- stance must be one of: supportive, opposing, neutral, observer
- exposure_round = 0 for agents in initial_exposed_agent_ids, else infer from cascade distance"""
system_prompt = (
"You are an expert in private organizational network dynamics. "
"Return pure JSON only — no markdown. "
"IMPORTANT: The 'stance' field MUST be one of: "
"'supportive', 'opposing', 'neutral', 'observer'. "
f"{get_language_instruction()}"
)
try:
result = self._call_llm_with_retry(prompt, system_prompt)
llm_map: Dict[int, Dict[str, Any]] = {
cfg["agent_id"]: cfg for cfg in result.get("agent_configs", [])
}
except Exception as e:
logger.warning(f"Agent config batch LLM failed: {e}. Using rule-based fallback.")
llm_map = {}
configs: List[RelationalActivityConfig] = []
for i, agent in enumerate(agent_profiles):
agent_id = agent.get("agent_id", start_idx + i)
rtype = agent.get("relational_link_type", "employee").lower()
cfg = llm_map.get(agent_id)
if not cfg:
cfg = self._agent_config_by_rule(rtype)
configs.append(RelationalActivityConfig(
agent_id=agent_id,
entity_uuid=agent.get("source_entity_uuid", ""),
entity_name=agent.get("entity_name", ""),
relational_link_type=rtype,
activity_level=float(cfg.get("activity_level", 0.5)),
response_delay_min=int(cfg.get("response_delay_min", 0)),
response_delay_max=int(cfg.get("response_delay_max", 3)),
sentiment_bias=float(cfg.get("sentiment_bias", 0.0)),
stance=cfg.get("stance", "neutral"),
influence_weight=float(cfg.get("influence_weight", 1.0)),
cascade_influence=agent.get("cascade_influence", []),
exposure_round=int(cfg.get("exposure_round", 0)),
))
return configs
def _agent_config_by_rule(self, relational_type: str) -> Dict[str, Any]:
"""
Return rule-based activity config for a given relational type.
Falls back to 'employee' defaults for unknown types.
Args:
relational_type: Lowercase relational type string.
Returns:
Dict with activity_level, response_delay_min/max, sentiment_bias,
stance, influence_weight.
"""
return dict(self.RELATIONAL_FALLBACKS.get(
relational_type, self.RELATIONAL_FALLBACKS["employee"]
))
# ── LLM helpers ───────────────────────────────────────────────────────────
def _call_llm_with_retry(self, prompt: str, system_prompt: str) -> Dict[str, Any]:
"""
Call the LLM with up to 3 retry attempts.
Mirrors SimulationConfigGenerator._call_llm_with_retry with:
- JSON response format enforced
- Temperature annealing on each retry
- Truncation repair via _fix_truncated_json
Args:
prompt: User prompt.
system_prompt: System instructions.
Returns:
Parsed JSON dict.
Raises:
Exception: If all attempts fail.
"""
import time
max_attempts = 3
last_error: Optional[Exception] = None
for attempt in range(max_attempts):
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
],
response_format={"type": "json_object"},
temperature=0.7 - (attempt * 0.1),
)
content = response.choices[0].message.content
finish_reason = response.choices[0].finish_reason
if finish_reason == "length":
logger.warning(
f"LLM output truncated (attempt {attempt + 1}), attempting repair..."
)
content = self._fix_truncated_json(content)
try:
return json.loads(content)
except json.JSONDecodeError as e:
logger.warning(f"JSON parse failed (attempt {attempt + 1}): {str(e)[:80]}")
fixed = self._try_fix_config_json(content)
if fixed:
return fixed
last_error = e
except Exception as e:
logger.warning(f"LLM call failed (attempt {attempt + 1}): {str(e)[:80]}")
last_error = e
time.sleep(2 * (attempt + 1))
raise last_error or Exception("LLM call failed after all retries")
def _fix_truncated_json(self, content: str) -> str:
"""Repair truncated JSON by closing unclosed brackets and strings."""
content = content.strip()
open_braces = content.count("{") - content.count("}")
open_brackets = content.count("[") - content.count("]")
if content and content[-1] not in '",}]':
content += '"'
content += "]" * open_brackets
content += "}" * open_braces
return content
def _try_fix_config_json(self, content: str) -> Optional[Dict[str, Any]]:
"""Attempt to extract and repair a JSON object from malformed LLM output."""
import re
content = self._fix_truncated_json(content)
json_match = re.search(r"\{[\s\S]*\}", content)
if not json_match:
return None
json_str = json_match.group()
def fix_string(match: re.Match) -> str:
s = match.group(0)
s = s.replace("\n", " ").replace("\r", " ")
s = re.sub(r"\s+", " ", s)
return s
json_str = re.sub(r'"[^"\\]*(?:\\.[^"\\]*)*"', fix_string, json_str)
try:
return json.loads(json_str)
except Exception:
json_str = re.sub(r"[\x00-\x1f\x7f-\x9f]", " ", json_str)
json_str = re.sub(r"\s+", " ", json_str)
try:
return json.loads(json_str)
except Exception:
return None

View File

@ -0,0 +1,823 @@
"""
Private Impact Profile Generator
Converts Zep graph entities into RelationalAgentProfile instances for the
Private Impact simulation mode.
Extends OasisProfileGenerator with relational dimensions:
- Relationship type with the decision maker (hierarchical, client, peer, ...)
- Trust level, financial sensitivity, equity tolerance, institutional loyalty
- Natural reaction mode (internalize, confront, silent_leave, coalition_build)
- Cascade influence graph (which agents this agent can expose)
Key design principle (from IDEE-FORK-MIROFISH.md §4):
The `persona` field is the sole behavioral vector injected into the LLM
system prompt each round. All relational dimensions are encoded as natural
language inside `persona` no engine modification required.
"""
import json
import random
import time
from dataclasses import dataclass, field
from datetime import datetime
from threading import Lock
from typing import Any, Dict, List, Optional
import concurrent.futures
from openai import OpenAI
from zep_cloud.client import Zep
from ..config import Config
from ..utils.logger import get_logger
from ..utils.locale import get_language_instruction, get_locale, set_locale
from .oasis_profile_generator import OasisAgentProfile, OasisProfileGenerator
from .zep_entity_reader import EntityNode
logger = get_logger('mirofish.private_impact_profile')
# ── RelationalAgentProfile ────────────────────────────────────────────────────
@dataclass
class RelationalAgentProfile(OasisAgentProfile):
"""
Extended OASIS Agent Profile with relational network dimensions.
All relational fields are encoded into the `persona` text field via
_encode_relational_persona() before being stored. The inherited `persona`
is what gets injected into the LLM system prompt each simulation round.
"""
# Relationship with the decision maker
relational_link_type: str = "peer" # hierarchical | client | peer | family | competitor
relational_seniority_years: int = 0
relational_trust_level: float = 0.5 # 0.0 → 1.0
# Psycho-social dimensions
financial_sensitivity: float = 0.5 # Sensitivity to wealth signals
equity_tolerance: float = 0.5 # Tolerance for status disparities
institutional_loyalty: float = 0.5 # Loyalty to the org vs the person
# Natural reaction mode when facing a triggering decision
private_reaction_mode: str = "internalize" # internalize | confront | silent_leave | coalition_build
# Cascade influence graph: agent_ids this agent can expose
cascade_influence: List[int] = field(default_factory=list)
def to_private_format(self) -> Dict[str, Any]:
"""
Serialize to the format expected by run_private_simulation.py.
The simulation engine reads agent_configs as plain dicts, accessing:
agent_id, entity_name, persona, cascade_influence, active_hours,
activity_level.
"""
return {
"agent_id": self.user_id,
"entity_name": self.name,
"user_name": self.user_name,
"bio": self.bio,
"persona": self.persona, # Encoded with relational context
"cascade_influence": self.cascade_influence,
"relational_link_type": self.relational_link_type,
"relational_seniority_years": self.relational_seniority_years,
"relational_trust_level": self.relational_trust_level,
"financial_sensitivity": self.financial_sensitivity,
"equity_tolerance": self.equity_tolerance,
"institutional_loyalty": self.institutional_loyalty,
"private_reaction_mode": self.private_reaction_mode,
"age": self.age,
"gender": self.gender,
"mbti": self.mbti,
"country": self.country,
"profession": self.profession,
"source_entity_uuid": self.source_entity_uuid,
"source_entity_type": self.source_entity_type,
"created_at": self.created_at,
}
def to_dict(self) -> Dict[str, Any]:
"""Full dict representation including relational fields."""
base = super().to_dict()
base.update({
"relational_link_type": self.relational_link_type,
"relational_seniority_years": self.relational_seniority_years,
"relational_trust_level": self.relational_trust_level,
"financial_sensitivity": self.financial_sensitivity,
"equity_tolerance": self.equity_tolerance,
"institutional_loyalty": self.institutional_loyalty,
"private_reaction_mode": self.private_reaction_mode,
"cascade_influence": self.cascade_influence,
})
return base
# ── PrivateImpactProfileGenerator ────────────────────────────────────────────
class PrivateImpactProfileGenerator(OasisProfileGenerator):
"""
Generates RelationalAgentProfile instances for the Private Impact simulation.
Extends OasisProfileGenerator with:
- Relational entity types (Employee, Manager, Client, ...)
- LLM prompt enriched with relational dimensions
- Relational rule-based fallback by entity type
- persona encoding that injects relational context as natural language
Pipeline (same as OasisProfileGenerator):
EntityNode (Zep) _build_entity_context() LLM / rule-based
RelationalAgentProfile (relational fields encoded into persona)
"""
# Relational entity types — map to default behavioral parameters
RELATIONAL_ENTITY_TYPES = [
"employee", "manager", "client", "competitor",
"partner", "familymember", "colleague", "investor",
]
# Default behavioral parameters by relational type
# (trust_level, financial_sensitivity, equity_tolerance,
# institutional_loyalty, reaction_mode, activity_level, active_hours)
RELATIONAL_DEFAULTS: Dict[str, Dict[str, Any]] = {
"employee": {
"trust_level": 0.6,
"financial_sensitivity": 0.7,
"equity_tolerance": 0.4,
"institutional_loyalty": 0.6,
"reaction_mode": "internalize",
"activity_level": 0.6,
"active_hours": list(range(8, 19)),
"influence_weight": 0.8,
},
"manager": {
"trust_level": 0.7,
"financial_sensitivity": 0.5,
"equity_tolerance": 0.5,
"institutional_loyalty": 0.7,
"reaction_mode": "confront",
"activity_level": 0.5,
"active_hours": list(range(8, 20)),
"influence_weight": 1.5,
},
"client": {
"trust_level": 0.4,
"financial_sensitivity": 0.3,
"equity_tolerance": 0.6,
"institutional_loyalty": 0.3,
"reaction_mode": "silent_leave",
"activity_level": 0.3,
"active_hours": list(range(9, 13)) + list(range(17, 21)),
"influence_weight": 1.2,
},
"competitor": {
"trust_level": 0.2,
"financial_sensitivity": 0.4,
"equity_tolerance": 0.7,
"institutional_loyalty": 0.1,
"reaction_mode": "coalition_build",
"activity_level": 0.2,
"active_hours": list(range(9, 19)),
"influence_weight": 1.0,
},
"partner": {
"trust_level": 0.6,
"financial_sensitivity": 0.4,
"equity_tolerance": 0.6,
"institutional_loyalty": 0.5,
"reaction_mode": "internalize",
"activity_level": 0.4,
"active_hours": list(range(9, 18)),
"influence_weight": 1.3,
},
"familymember": {
"trust_level": 0.8,
"financial_sensitivity": 0.8,
"equity_tolerance": 0.5,
"institutional_loyalty": 0.2,
"reaction_mode": "confront",
"activity_level": 0.7,
"active_hours": list(range(7, 10)) + list(range(18, 24)),
"influence_weight": 0.9,
},
}
# Reaction mode descriptions for LLM prompt injection
REACTION_MODE_DESCRIPTIONS: Dict[str, str] = {
"internalize": "processes the news internally without immediate visible action; absorbs tension before potentially acting later",
"confront": "tends to address the issue head-on, speaking directly to the decision maker or raising concerns openly",
"silent_leave": "quietly disengages — reduces commitment, starts looking for alternatives, without announcing it",
"coalition_build": "looks for allies among peers before taking any visible action; builds shared narratives",
}
def generate_profile_from_entity(
self,
entity: EntityNode,
user_id: int,
use_llm: bool = True,
cascade_influence: Optional[List[int]] = None,
) -> RelationalAgentProfile:
"""
Generate a RelationalAgentProfile from a Zep entity node.
Divergence from OasisProfileGenerator.generate_profile_from_entity:
Returns RelationalAgentProfile instead of OasisAgentProfile.
Relational dimensions are encoded into the persona text field.
Args:
entity: Zep entity node.
user_id: Agent ID in the simulation.
use_llm: Whether to use LLM for profile generation.
cascade_influence: List of agent_ids this agent can expose (optional).
Returns:
RelationalAgentProfile with relational context encoded in persona.
"""
entity_type = entity.get_entity_type() or "peer"
name = entity.name
user_name = self._generate_username(name)
context = self._build_entity_context(entity)
if use_llm:
profile_data = self._generate_relational_profile_with_llm(
entity_name=name,
entity_type=entity_type,
entity_summary=entity.summary,
entity_attributes=entity.attributes,
context=context,
)
else:
profile_data = self._generate_relational_profile_rule_based(
entity_name=name,
entity_type=entity_type,
entity_summary=entity.summary,
)
# Extract relational dimensions from LLM output
relational_link_type = profile_data.get("relational_link_type", "peer")
seniority_years = int(profile_data.get("relational_seniority_years", 0))
trust_level = float(profile_data.get("relational_trust_level", 0.5))
financial_sensitivity = float(profile_data.get("financial_sensitivity", 0.5))
equity_tolerance = float(profile_data.get("equity_tolerance", 0.5))
institutional_loyalty = float(profile_data.get("institutional_loyalty", 0.5))
reaction_mode = profile_data.get("private_reaction_mode", "internalize")
# Clamp floats to [0.0, 1.0]
trust_level = max(0.0, min(1.0, trust_level))
financial_sensitivity = max(0.0, min(1.0, financial_sensitivity))
equity_tolerance = max(0.0, min(1.0, equity_tolerance))
institutional_loyalty = max(0.0, min(1.0, institutional_loyalty))
# Encode relational context into the persona text
base_persona = profile_data.get(
"persona", entity.summary or f"A {entity_type} named {name}."
)
enriched_persona = self._encode_relational_persona(
base_persona=base_persona,
name=name,
relational_link_type=relational_link_type,
seniority_years=seniority_years,
trust_level=trust_level,
financial_sensitivity=financial_sensitivity,
equity_tolerance=equity_tolerance,
institutional_loyalty=institutional_loyalty,
reaction_mode=reaction_mode,
)
return RelationalAgentProfile(
user_id=user_id,
user_name=user_name,
name=name,
bio=profile_data.get("bio", f"{entity_type}: {name}"),
persona=enriched_persona,
karma=profile_data.get("karma", random.randint(500, 3000)),
friend_count=profile_data.get("friend_count", random.randint(20, 300)),
follower_count=profile_data.get("follower_count", random.randint(30, 500)),
statuses_count=profile_data.get("statuses_count", random.randint(50, 1000)),
age=profile_data.get("age"),
gender=profile_data.get("gender"),
mbti=profile_data.get("mbti"),
country=profile_data.get("country"),
profession=profile_data.get("profession"),
interested_topics=profile_data.get("interested_topics", []),
source_entity_uuid=entity.uuid,
source_entity_type=entity_type,
relational_link_type=relational_link_type,
relational_seniority_years=seniority_years,
relational_trust_level=trust_level,
financial_sensitivity=financial_sensitivity,
equity_tolerance=equity_tolerance,
institutional_loyalty=institutional_loyalty,
private_reaction_mode=reaction_mode,
cascade_influence=cascade_influence or [],
)
# ── Persona encoding ──────────────────────────────────────────────────────
def _encode_relational_persona(
self,
base_persona: str,
name: str,
relational_link_type: str,
seniority_years: int,
trust_level: float,
financial_sensitivity: float,
equity_tolerance: float,
institutional_loyalty: float,
reaction_mode: str,
) -> str:
"""
Encode relational dimensions into natural language appended to persona.
This is the central mechanism: OASIS (and our private simulation) inject
the persona field as-is into the LLM system prompt. By appending a
structured relational context block, we guide agent behavior without
modifying the simulation engine.
Args:
base_persona: Base persona text from LLM or rule-based fallback.
name: Agent name.
relational_link_type: Type of relationship with the decision maker.
seniority_years: Years in this relational context.
trust_level: Trust level with decision maker (01).
financial_sensitivity: Sensitivity to wealth signals (01).
equity_tolerance: Tolerance for status disparities (01).
institutional_loyalty: Loyalty to the org vs the person (01).
reaction_mode: Natural reaction pattern.
Returns:
Enriched persona string with relational context block appended.
"""
# Trust descriptor
if trust_level >= 0.75:
trust_desc = "very high"
elif trust_level >= 0.5:
trust_desc = "moderate"
elif trust_level >= 0.25:
trust_desc = "low"
else:
trust_desc = "very low"
# Financial sensitivity descriptor
if financial_sensitivity >= 0.75:
fin_desc = "highly sensitive to wealth signals and perceived inequity"
elif financial_sensitivity >= 0.5:
fin_desc = "moderately sensitive to financial signals"
else:
fin_desc = "relatively indifferent to wealth signals"
# Equity tolerance descriptor
if equity_tolerance <= 0.25:
eq_desc = "very low tolerance for status disparities — notices and resents inequalities"
elif equity_tolerance <= 0.5:
eq_desc = "moderate discomfort with status disparities"
else:
eq_desc = "accepts status differences as normal"
reaction_desc = self.REACTION_MODE_DESCRIPTIONS.get(
reaction_mode,
"processes the situation and responds according to their character"
)
seniority_str = (
f"{seniority_years} year{'s' if seniority_years != 1 else ''}"
if seniority_years > 0 else "recent"
)
loyalty_desc = (
"strongly attached to the organization and its continuity"
if institutional_loyalty >= 0.7
else "balanced between personal interests and organizational ones"
if institutional_loyalty >= 0.4
else "primarily driven by personal interests over institutional ones"
)
relational_block = (
f"\n\n--- Relational Context (Private Impact Simulation) ---\n"
f"Your name is {name}.\n"
f"Your relationship with the decision maker: {relational_link_type} "
f"({seniority_str} of shared history).\n"
f"Trust level with the decision maker: {trust_desc} ({trust_level:.1f}/1.0).\n"
f"Financial sensitivity: {fin_desc} (score: {financial_sensitivity:.1f}).\n"
f"Equity tolerance: {eq_desc} (score: {equity_tolerance:.1f}).\n"
f"Institutional loyalty: {loyalty_desc} (score: {institutional_loyalty:.1f}).\n"
f"Your natural reaction mode: {reaction_mode} — you {reaction_desc}.\n"
f"--- End Relational Context ---"
)
return base_persona + relational_block
# ── LLM profile generation ────────────────────────────────────────────────
def _generate_relational_profile_with_llm(
self,
entity_name: str,
entity_type: str,
entity_summary: str,
entity_attributes: Dict[str, Any],
context: str,
) -> Dict[str, Any]:
"""
Generate relational profile via LLM.
Divergence from OasisProfileGenerator._generate_profile_with_llm:
Adds relational dimension fields to the JSON output schema.
Falls back to rule-based generation on failure (same pattern as parent).
Args:
entity_name: Entity name.
entity_type: Entity type from Zep.
entity_summary: Entity summary from Zep.
entity_attributes: Entity attributes dict.
context: Enriched context from _build_entity_context().
Returns:
Profile data dict including relational dimensions.
"""
prompt = self._build_relational_persona_prompt(
entity_name=entity_name,
entity_type=entity_type,
entity_summary=entity_summary,
entity_attributes=entity_attributes,
context=context,
)
system_prompt = (
"You are an expert in organizational psychology and behavioral simulation. "
"Generate realistic relational agent profiles for private impact simulations. "
"Return valid JSON only — no markdown, no prose outside the JSON object. "
f"{get_language_instruction()}"
)
max_attempts = 3
last_error = None
for attempt in range(max_attempts):
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
],
response_format={"type": "json_object"},
temperature=0.7 - (attempt * 0.1),
)
content = response.choices[0].message.content
finish_reason = response.choices[0].finish_reason
if finish_reason == 'length':
logger.warning(f"LLM output truncated (attempt {attempt + 1}), attempting fix...")
content = self._fix_truncated_json(content)
try:
result = json.loads(content)
# Ensure required fields
if not result.get("bio"):
result["bio"] = entity_summary[:200] if entity_summary else f"{entity_type}: {entity_name}"
if not result.get("persona"):
result["persona"] = entity_summary or f"{entity_name} is a {entity_type}."
return result
except json.JSONDecodeError as je:
logger.warning(f"JSON parse failed (attempt {attempt + 1}): {str(je)[:80]}")
result = self._try_fix_json(content, entity_name, entity_type, entity_summary)
if result.get("_fixed"):
del result["_fixed"]
return result
last_error = je
except Exception as e:
logger.warning(f"LLM call failed (attempt {attempt + 1}): {str(e)[:80]}")
last_error = e
time.sleep(1 * (attempt + 1))
logger.warning(
f"LLM profile generation failed after {max_attempts} attempts: {last_error}. "
f"Falling back to rule-based."
)
return self._generate_relational_profile_rule_based(
entity_name=entity_name,
entity_type=entity_type,
entity_summary=entity_summary,
)
def _build_relational_persona_prompt(
self,
entity_name: str,
entity_type: str,
entity_summary: str,
entity_attributes: Dict[str, Any],
context: str,
) -> str:
"""
Build the LLM prompt for relational profile generation.
Divergence from parent _build_individual_persona_prompt:
Adds relational dimension fields to the JSON schema.
"""
attrs_str = json.dumps(entity_attributes, ensure_ascii=False) if entity_attributes else "none"
context_str = context[:3000] if context else "No additional context."
return f"""Generate a relational agent profile for a private impact simulation.
Entity name: {entity_name}
Entity type: {entity_type}
Entity summary: {entity_summary}
Entity attributes: {attrs_str}
Context:
{context_str}
Return a JSON object with these fields:
1. bio: Short profile description (max 200 characters)
2. persona: Detailed behavioral description (plain text, no line breaks inside the string, ~500 words):
- Background, personality, professional history
- Emotional patterns and communication style
- Relationship with authority and institutions
- Known reactions to organizational decisions
3. age: Integer (or null)
4. gender: "male", "female", or "other"
5. mbti: MBTI type (e.g. "INTJ") or null
6. country: Country name
7. profession: Current role or function
8. interested_topics: Array of relevant topics
Relational dimensions (required):
9. relational_link_type: One of "hierarchical", "client", "peer", "family", "competitor"
10. relational_seniority_years: Integer (years in this relational context)
11. relational_trust_level: Float 0.01.0 (trust in decision maker)
12. financial_sensitivity: Float 0.01.0 (sensitivity to wealth signals)
13. equity_tolerance: Float 0.01.0 (tolerance for status disparities)
14. institutional_loyalty: Float 0.01.0 (loyalty to org vs personal interests)
15. private_reaction_mode: One of "internalize", "confront", "silent_leave", "coalition_build"
Rules:
- All string values must not contain literal line breaks
- persona must be a single continuous text paragraph
- Float values must be between 0.0 and 1.0
- Infer relational dimensions from entity type and context when possible
"""
# ── Rule-based fallback ───────────────────────────────────────────────────
def _generate_relational_profile_rule_based(
self,
entity_name: str,
entity_type: str,
entity_summary: str,
) -> Dict[str, Any]:
"""
Generate relational profile using predefined defaults by entity type.
Divergence from OasisProfileGenerator._generate_profile_rule_based:
Uses RELATIONAL_DEFAULTS table instead of social media entity types.
Covers: Employee, Manager, Client, Competitor, Partner, FamilyMember.
Args:
entity_name: Entity name.
entity_type: Relational entity type.
entity_summary: Entity summary for persona fallback.
Returns:
Profile data dict with relational dimensions set from defaults.
"""
type_key = entity_type.lower()
defaults = self.RELATIONAL_DEFAULTS.get(type_key, self.RELATIONAL_DEFAULTS["employee"])
base_persona = (
entity_summary
or f"{entity_name} is a {entity_type} connected to the decision maker's network."
)
return {
"bio": (
entity_summary[:150]
if entity_summary
else f"{entity_type}: {entity_name}"
),
"persona": base_persona,
"age": random.randint(25, 55),
"gender": random.choice(["male", "female"]),
"mbti": random.choice(self.MBTI_TYPES),
"country": random.choice(self.COUNTRIES),
"profession": entity_type.capitalize(),
"interested_topics": ["Professional Development", "Organizational Dynamics"],
# Relational dimensions from defaults table
"relational_link_type": type_key if type_key in (
"hierarchical", "client", "peer", "family", "competitor"
) else "peer",
"relational_seniority_years": random.randint(1, 8),
"relational_trust_level": defaults["trust_level"],
"financial_sensitivity": defaults["financial_sensitivity"],
"equity_tolerance": defaults.get("equity_tolerance", 0.5),
"institutional_loyalty": defaults.get("institutional_loyalty", 0.5),
"private_reaction_mode": defaults["reaction_mode"],
}
# ── Batch generation ──────────────────────────────────────────────────────
def generate_profiles_from_entities(
self,
entities: List[EntityNode],
use_llm: bool = True,
progress_callback: Optional[callable] = None,
graph_id: Optional[str] = None,
parallel_count: int = 5,
realtime_output_path: Optional[str] = None,
cascade_influence_map: Optional[Dict[int, List[int]]] = None,
**kwargs, # absorb unused parent kwargs (output_platform, etc.)
) -> List[RelationalAgentProfile]:
"""
Generate RelationalAgentProfile instances for all entities in parallel.
Divergence from OasisProfileGenerator.generate_profiles_from_entities:
Returns RelationalAgentProfile instances.
Accepts cascade_influence_map to assign relational graph edges per agent.
Args:
entities: List of Zep entity nodes.
use_llm: Whether to use LLM generation (falls back to rule-based).
progress_callback: Optional callback(current, total, message).
graph_id: Zep graph ID for context enrichment.
parallel_count: Number of concurrent generation threads.
realtime_output_path: Path to write profiles as they are generated.
cascade_influence_map: {agent_index: [influenced_agent_ids]}.
Returns:
List of RelationalAgentProfile instances.
"""
if graph_id:
self.graph_id = graph_id
cascade_influence_map = cascade_influence_map or {}
total = len(entities)
profiles: List[Optional[RelationalAgentProfile]] = [None] * total
completed_count = [0]
lock = Lock()
def save_realtime() -> None:
"""Write generated profiles to file as they complete."""
if not realtime_output_path:
return
with lock:
existing = [p for p in profiles if p is not None]
if not existing:
return
try:
data = [p.to_private_format() for p in existing]
with open(realtime_output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.warning(f"Realtime save failed: {e}")
current_locale = get_locale()
def generate_single(idx: int, entity: EntityNode) -> tuple:
set_locale(current_locale)
entity_type = entity.get_entity_type() or "peer"
cascade = cascade_influence_map.get(idx, [])
try:
profile = self.generate_profile_from_entity(
entity=entity,
user_id=idx,
use_llm=use_llm,
cascade_influence=cascade,
)
self._print_generated_relational_profile(entity.name, entity_type, profile)
return idx, profile, None
except Exception as e:
logger.error(f"Profile generation failed for {entity.name}: {e}")
fallback = RelationalAgentProfile(
user_id=idx,
user_name=self._generate_username(entity.name),
name=entity.name,
bio=f"{entity_type}: {entity.name}",
persona=(
entity.summary
or f"{entity.name} is a {entity_type} in the relational network."
),
source_entity_uuid=entity.uuid,
source_entity_type=entity_type,
cascade_influence=cascade,
)
return idx, fallback, str(e)
logger.info(
f"Starting parallel profile generation — {total} entities, "
f"parallel_count={parallel_count}"
)
print(f"\n{'='*60}")
print(f"Private Impact — Generating {total} relational profiles (parallel: {parallel_count})")
print(f"{'='*60}\n")
with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_count) as executor:
future_map = {
executor.submit(generate_single, idx, entity): (idx, entity)
for idx, entity in enumerate(entities)
}
for future in concurrent.futures.as_completed(future_map):
idx, entity = future_map[future]
entity_type = entity.get_entity_type() or "peer"
try:
result_idx, profile, error = future.result()
profiles[result_idx] = profile
with lock:
completed_count[0] += 1
current = completed_count[0]
save_realtime()
if progress_callback:
progress_callback(
current,
total,
f"Done {current}/{total}: {entity.name} ({entity_type})"
)
if error:
logger.warning(f"[{current}/{total}] {entity.name} using fallback: {error}")
else:
logger.info(f"[{current}/{total}] Generated: {entity.name} ({entity_type})")
except Exception as e:
logger.error(f"Error processing {entity.name}: {e}")
with lock:
completed_count[0] += 1
profiles[idx] = RelationalAgentProfile(
user_id=idx,
user_name=self._generate_username(entity.name),
name=entity.name,
bio=f"{entity_type}: {entity.name}",
persona=entity.summary or "A participant in the relational network.",
source_entity_uuid=entity.uuid,
source_entity_type=entity_type,
)
save_realtime()
valid_count = len([p for p in profiles if p is not None])
print(f"\n{'='*60}")
print(f"Profile generation complete — {valid_count} relational agents ready")
print(f"{'='*60}\n")
return [p for p in profiles if p is not None]
def save_profiles(
self,
profiles: List[RelationalAgentProfile],
file_path: str,
platform: str = "private",
) -> None:
"""
Save relational profiles to JSON.
Divergence from OasisProfileGenerator.save_profiles:
Always uses to_private_format() no CSV output, no Reddit/Twitter format.
The output is a JSON array of agent config dicts consumed by
run_private_simulation.py.
Args:
profiles: List of RelationalAgentProfile instances.
file_path: Output path (.json).
platform: Ignored always uses private format.
"""
data = [p.to_private_format() for p in profiles]
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"Saved {len(profiles)} relational profiles to {file_path}")
# ── Console output ────────────────────────────────────────────────────────
def _print_generated_relational_profile(
self,
entity_name: str,
entity_type: str,
profile: RelationalAgentProfile,
) -> None:
"""Print a summary of the generated relational profile to stdout."""
separator = "-" * 70
lines = [
f"\n{separator}",
f"[Private Impact] Profile generated: {entity_name} ({entity_type})",
separator,
f"Name: {profile.name} | Link: {profile.relational_link_type} "
f"| Reaction: {profile.private_reaction_mode}",
f"Trust: {profile.relational_trust_level:.2f} "
f"| Fin.Sensitivity: {profile.financial_sensitivity:.2f} "
f"| Loyalty: {profile.institutional_loyalty:.2f}",
f"Cascade influence: {profile.cascade_influence}",
f"",
f"[Bio] {profile.bio}",
separator,
]
print("\n".join(lines))

View File

@ -0,0 +1,903 @@
"""
Private Impact Runner
Orchestrates the Private Impact simulation via subprocess, monitors
private/actions.jsonl for real-time state updates, and exposes the
interface used by the Flask /api/private-impact blueprint.
Equivalent of simulation_runner.py for the Private Impact mode.
Key differences from SimulationRunner:
- Single platform: "private" (no Twitter/Reddit split)
- Action log: {sim_dir}/private/actions.jsonl
- Config file: private_simulation_config.json
- Script: backend/scripts/run_private_simulation.py
- Time unit: simulated days (not hours)
- Cleanup removes private_simulation.db + private/ directory
Note on SimulationLogManager.get_private_logger():
SimulationLogManager (backend/scripts/action_logger.py) does NOT currently
expose get_private_logger(). run_private_simulation.py falls back directly
to PlatformActionLogger("private", simulation_dir). This method must be added
to action_logger.py in a future prompt see CONTEXT.md.
"""
import json
import os
import shutil
import signal
import subprocess
import sys
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
from ..utils.logger import get_logger
from ..utils.locale import get_locale, set_locale
from .zep_graph_memory_updater import ZepGraphMemoryManager
logger = get_logger('mirofish.private_impact_runner')
IS_WINDOWS = sys.platform == 'win32'
# ── Enums ─────────────────────────────────────────────────────────────────────
class PrivateRunnerStatus(str, Enum):
"""Run state of the Private Impact simulation subprocess."""
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
STOPPED = "stopped"
COMPLETED = "completed"
FAILED = "failed"
# ── Dataclasses ───────────────────────────────────────────────────────────────
@dataclass
class PrivateAgentAction:
"""
Single relational action record parsed from private/actions.jsonl.
Equivalent of AgentAction for the private simulation mode.
No platform split all actions are platform="private".
"""
round_num: int
timestamp: str
agent_id: int
agent_name: str
action_type: str # REACT_PRIVATELY | CONFRONT | COALITION_BUILD |
# SILENT_LEAVE | VOCAL_SUPPORT | DO_NOTHING
action_args: Dict[str, Any] = field(default_factory=dict)
result: Optional[str] = None
success: bool = True
def to_dict(self) -> Dict[str, Any]:
return {
"round_num": self.round_num,
"timestamp": self.timestamp,
"platform": "private",
"agent_id": self.agent_id,
"agent_name": self.agent_name,
"action_type": self.action_type,
"action_args": self.action_args,
"result": self.result,
"success": self.success,
}
@dataclass
class PrivateSimulationRunState:
"""
Real-time run state for a Private Impact simulation.
Equivalent of SimulationRunState for the private mode.
Uses private_* field names no twitter_* / reddit_* split.
"""
simulation_id: str
runner_status: PrivateRunnerStatus = PrivateRunnerStatus.IDLE
# Progress
private_current_round: int = 0
private_total_rounds: int = 0
private_simulated_days: int = 0
private_total_days: int = 0
# Platform state (single: private)
private_running: bool = False
private_actions_count: int = 0
private_completed: bool = False
# Error
private_error: Optional[str] = None
# Recent actions for frontend live display
recent_actions: List[PrivateAgentAction] = field(default_factory=list)
max_recent_actions: int = 50
# Timestamps
started_at: Optional[str] = None
updated_at: str = field(default_factory=lambda: datetime.now().isoformat())
completed_at: Optional[str] = None
# Subprocess PID
process_pid: Optional[int] = None
def add_action(self, action: PrivateAgentAction) -> None:
"""Prepend action to recent_actions and increment actions counter."""
self.recent_actions.insert(0, action)
if len(self.recent_actions) > self.max_recent_actions:
self.recent_actions = self.recent_actions[:self.max_recent_actions]
self.private_actions_count += 1
self.updated_at = datetime.now().isoformat()
def to_dict(self) -> Dict[str, Any]:
total = max(self.private_total_rounds, 1)
return {
"simulation_id": self.simulation_id,
"runner_status": self.runner_status.value,
"private_current_round": self.private_current_round,
"private_total_rounds": self.private_total_rounds,
"private_simulated_days": self.private_simulated_days,
"private_total_days": self.private_total_days,
"progress_percent": round(self.private_current_round / total * 100, 1),
"private_running": self.private_running,
"private_actions_count": self.private_actions_count,
"private_completed": self.private_completed,
"private_error": self.private_error,
"started_at": self.started_at,
"updated_at": self.updated_at,
"completed_at": self.completed_at,
"process_pid": self.process_pid,
}
def to_detail_dict(self) -> Dict[str, Any]:
"""Extended dict including recent actions."""
result = self.to_dict()
result["recent_actions"] = [a.to_dict() for a in self.recent_actions]
return result
# ── PrivateImpactRunner ────────────────────────────────────────────────────────
class PrivateImpactRunner:
"""
Orchestrates Private Impact simulations.
Equivalent of SimulationRunner for the private relational mode.
Launches run_private_simulation.py as a subprocess, monitors
private/actions.jsonl for state updates, and exposes the interface
consumed by the Flask /api/private-impact blueprint.
Directory layout (under RUN_STATE_DIR/{simulation_id}/):
private_simulation_config.json PrivateSimulationParameters.to_dict()
private/actions.jsonl relational action log
simulation.log subprocess stdout + stderr
run_state.json persisted PrivateSimulationRunState
"""
RUN_STATE_DIR = os.path.join(
os.path.dirname(__file__),
'../../uploads/simulations'
)
SCRIPTS_DIR = os.path.join(
os.path.dirname(__file__),
'../../scripts'
)
CONFIG_FILENAME = "private_simulation_config.json"
SCRIPT_NAME = "run_private_simulation.py"
# Class-level in-memory state (same pattern as SimulationRunner)
_run_states: Dict[str, PrivateSimulationRunState] = {}
_processes: Dict[str, subprocess.Popen] = {}
_monitor_threads: Dict[str, threading.Thread] = {}
_stdout_files: Dict[str, Any] = {}
_graph_memory_enabled: Dict[str, bool] = {}
# ── Public API ─────────────────────────────────────────────────────────────
@classmethod
def get_status(cls, simulation_id: str) -> Optional[PrivateSimulationRunState]:
"""
Return the current run state for a simulation.
Checks in-memory cache first, then falls back to disk
(same pattern as SimulationRunner.get_run_state).
Args:
simulation_id: Simulation identifier.
Returns:
PrivateSimulationRunState or None if not found.
"""
if simulation_id in cls._run_states:
return cls._run_states[simulation_id]
return cls._load_run_state(simulation_id)
@classmethod
def start_simulation(
cls,
simulation_id: str,
max_rounds: Optional[int] = None,
enable_graph_memory_update: bool = False,
graph_id: Optional[str] = None,
) -> PrivateSimulationRunState:
"""
Launch the private impact simulation subprocess.
Same mechanics as SimulationRunner.start_simulation (L.387399):
- Reads private_simulation_config.json from the simulation directory
- Spawns run_private_simulation.py with start_new_session=True
- Redirects stdout/stderr to simulation.log
- Launches a background monitor thread
Args:
simulation_id: Unique simulation identifier.
max_rounds: Optional upper bound on simulation rounds.
enable_graph_memory_update: Push activity updates to Zep graph.
graph_id: Required when enable_graph_memory_update=True.
Returns:
PrivateSimulationRunState with status=STARTING.
Raises:
ValueError: If already running, config missing, or graph_id absent.
"""
existing = cls.get_status(simulation_id)
if existing and existing.runner_status in (
PrivateRunnerStatus.RUNNING, PrivateRunnerStatus.STARTING
):
raise ValueError(f"Private simulation already running: {simulation_id}")
sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id)
config_path = os.path.join(sim_dir, cls.CONFIG_FILENAME)
if not os.path.exists(config_path):
raise ValueError(
f"Private simulation config not found: {config_path}. "
"Call /prepare first to generate the config."
)
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
time_cfg = config.get("time_config", {})
total_days = time_cfg.get("total_simulation_days", 30)
rounds_per_day = time_cfg.get("rounds_per_day", 3)
total_rounds = total_days * rounds_per_day
if max_rounds is not None and max_rounds > 0:
total_rounds = min(total_rounds, max_rounds)
logger.info(
f"[PRIVATE] Rounds capped to {total_rounds} "
f"(max_rounds={max_rounds})"
)
state = PrivateSimulationRunState(
simulation_id=simulation_id,
runner_status=PrivateRunnerStatus.STARTING,
private_total_rounds=total_rounds,
private_total_days=total_days,
private_running=True,
started_at=datetime.now().isoformat(),
)
cls._save_run_state(state)
# Optional Zep graph memory update
if enable_graph_memory_update:
if not graph_id:
raise ValueError(
"graph_id is required when enable_graph_memory_update=True"
)
try:
ZepGraphMemoryManager.create_updater(simulation_id, graph_id)
cls._graph_memory_enabled[simulation_id] = True
logger.info(
f"[PRIVATE] Graph memory update enabled: "
f"simulation_id={simulation_id}, graph_id={graph_id}"
)
except Exception as e:
logger.error(f"[PRIVATE] Failed to create graph memory updater: {e}")
cls._graph_memory_enabled[simulation_id] = False
else:
cls._graph_memory_enabled[simulation_id] = False
script_path = os.path.join(cls.SCRIPTS_DIR, cls.SCRIPT_NAME)
if not os.path.exists(script_path):
raise ValueError(f"Script not found: {script_path}")
try:
cmd = [sys.executable, script_path, "--config", config_path]
if max_rounds is not None and max_rounds > 0:
cmd.extend(["--max-rounds", str(max_rounds)])
main_log_path = os.path.join(sim_dir, "simulation.log")
main_log_file = open(main_log_path, 'w', encoding='utf-8')
env = os.environ.copy()
env['PYTHONUTF8'] = '1'
env['PYTHONIOENCODING'] = 'utf-8'
process = subprocess.Popen(
cmd,
cwd=sim_dir,
stdout=main_log_file,
stderr=subprocess.STDOUT,
text=True,
encoding='utf-8',
bufsize=1,
env=env,
start_new_session=True,
)
cls._stdout_files[simulation_id] = main_log_file
state.process_pid = process.pid
state.runner_status = PrivateRunnerStatus.RUNNING
cls._processes[simulation_id] = process
cls._save_run_state(state)
current_locale = get_locale()
monitor_thread = threading.Thread(
target=cls._monitor_simulation,
args=(simulation_id, current_locale),
daemon=True,
)
monitor_thread.start()
cls._monitor_threads[simulation_id] = monitor_thread
logger.info(
f"[PRIVATE] Simulation started: {simulation_id}, "
f"pid={process.pid}, total_rounds={total_rounds}, "
f"total_days={total_days}"
)
except Exception as e:
state.runner_status = PrivateRunnerStatus.FAILED
state.private_error = str(e)
state.private_running = False
cls._save_run_state(state)
raise
return state
@classmethod
def stop_simulation(cls, simulation_id: str) -> PrivateSimulationRunState:
"""
Stop a running private simulation with a clean SIGTERM.
Same mechanics as SimulationRunner.stop_simulation.
Args:
simulation_id: Simulation identifier.
Returns:
Updated PrivateSimulationRunState with status=STOPPED.
Raises:
ValueError: If simulation does not exist or is not running.
"""
state = cls.get_status(simulation_id)
if not state:
raise ValueError(f"Private simulation not found: {simulation_id}")
if state.runner_status not in (
PrivateRunnerStatus.RUNNING, PrivateRunnerStatus.STARTING
):
raise ValueError(
f"Private simulation is not running: "
f"{simulation_id}, status={state.runner_status}"
)
state.runner_status = PrivateRunnerStatus.STOPPING
cls._save_run_state(state)
process = cls._processes.get(simulation_id)
if process and process.poll() is None:
try:
cls._terminate_process(process, simulation_id)
except ProcessLookupError:
pass
except Exception as e:
logger.error(f"[PRIVATE] Terminate failed: {simulation_id}, {e}")
try:
process.terminate()
process.wait(timeout=5)
except Exception:
process.kill()
state.runner_status = PrivateRunnerStatus.STOPPED
state.private_running = False
state.completed_at = datetime.now().isoformat()
cls._save_run_state(state)
if cls._graph_memory_enabled.get(simulation_id, False):
try:
ZepGraphMemoryManager.stop_updater(simulation_id)
except Exception as e:
logger.error(f"[PRIVATE] Failed to stop graph updater: {e}")
cls._graph_memory_enabled.pop(simulation_id, None)
logger.info(f"[PRIVATE] Simulation stopped: {simulation_id}")
return state
@classmethod
def get_all_actions(
cls,
simulation_id: str,
agent_id: Optional[int] = None,
round_num: Optional[int] = None,
) -> List[PrivateAgentAction]:
"""
Read the complete private/actions.jsonl action history.
Args:
simulation_id: Simulation identifier.
agent_id: Optional filter by agent ID.
round_num: Optional filter by round number.
Returns:
List of PrivateAgentAction sorted by timestamp descending.
"""
sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id)
log_path = os.path.join(sim_dir, "private", "actions.jsonl")
actions = cls._read_actions_from_file(
log_path, agent_id=agent_id, round_num=round_num
)
actions.sort(key=lambda a: a.timestamp, reverse=True)
return actions
@classmethod
def cleanup(cls, simulation_id: str) -> Dict[str, Any]:
"""
Remove private simulation artifacts to allow a fresh restart.
Deletes:
- run_state.json
- simulation.log
- private_simulation.db
- private/ directory (contains actions.jsonl)
Does NOT delete: private_simulation_config.json, profile files.
Args:
simulation_id: Simulation identifier.
Returns:
Dict with keys: success (bool), cleaned_files (list), errors (list|None).
"""
sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id)
if not os.path.exists(sim_dir):
return {"success": True, "cleaned_files": [], "errors": None}
cleaned: List[str] = []
errors: List[str] = []
for filename in ("run_state.json", "simulation.log", "private_simulation.db"):
path = os.path.join(sim_dir, filename)
if os.path.exists(path):
try:
os.remove(path)
cleaned.append(filename)
except Exception as e:
errors.append(f"Failed to delete {filename}: {e}")
private_dir = os.path.join(sim_dir, "private")
if os.path.exists(private_dir):
try:
shutil.rmtree(private_dir)
cleaned.append("private/")
except Exception as e:
errors.append(f"Failed to delete private/: {e}")
cls._run_states.pop(simulation_id, None)
logger.info(
f"[PRIVATE] Cleanup done: {simulation_id}, removed={cleaned}"
)
return {
"success": len(errors) == 0,
"cleaned_files": cleaned,
"errors": errors or None,
}
# ── Internal: monitor thread ───────────────────────────────────────────────
@classmethod
def _monitor_simulation(cls, simulation_id: str, locale: str = 'en') -> None:
"""
Background thread: poll private/actions.jsonl until subprocess exits.
Same pattern as SimulationRunner._monitor_simulation (L.482581):
- Loops while process is alive, reading new log lines every 2 s
- Performs a final read after process exit
- Sets COMPLETED or FAILED based on exit code
- Stops graph memory updater in finally block
Args:
simulation_id: Simulation identifier.
locale: Locale inherited from the calling thread.
"""
set_locale(locale)
sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id)
private_log = os.path.join(sim_dir, "private", "actions.jsonl")
process = cls._processes.get(simulation_id)
state = cls.get_status(simulation_id)
if not process or not state:
return
log_position = 0
try:
while process.poll() is None:
if os.path.exists(private_log):
log_position = cls._read_action_log(
private_log, log_position, state
)
cls._save_run_state(state)
time.sleep(2)
# Final read after process exits
if os.path.exists(private_log):
cls._read_action_log(private_log, log_position, state)
exit_code = process.returncode
if exit_code == 0:
state.runner_status = PrivateRunnerStatus.COMPLETED
state.completed_at = datetime.now().isoformat()
logger.info(f"[PRIVATE] Simulation completed: {simulation_id}")
else:
state.runner_status = PrivateRunnerStatus.FAILED
main_log = os.path.join(sim_dir, "simulation.log")
error_tail = ""
try:
if os.path.exists(main_log):
with open(main_log, 'r', encoding='utf-8') as f:
error_tail = f.read()[-2000:]
except Exception:
pass
state.private_error = (
f"Process exited with code {exit_code}. "
f"Last log output: {error_tail}"
)
logger.error(
f"[PRIVATE] Simulation failed: {simulation_id}, "
f"exit_code={exit_code}"
)
state.private_running = False
cls._save_run_state(state)
except Exception as e:
logger.error(f"[PRIVATE] Monitor thread error: {simulation_id}, {e}")
state.runner_status = PrivateRunnerStatus.FAILED
state.private_error = str(e)
cls._save_run_state(state)
finally:
if cls._graph_memory_enabled.get(simulation_id, False):
try:
ZepGraphMemoryManager.stop_updater(simulation_id)
logger.info(
f"[PRIVATE] Graph memory updater stopped: {simulation_id}"
)
except Exception as e:
logger.error(
f"[PRIVATE] Failed to stop graph updater: {e}"
)
cls._graph_memory_enabled.pop(simulation_id, None)
cls._processes.pop(simulation_id, None)
if simulation_id in cls._stdout_files:
try:
cls._stdout_files[simulation_id].close()
except Exception:
pass
cls._stdout_files.pop(simulation_id, None)
# ── Internal: log reader ───────────────────────────────────────────────────
@classmethod
def _read_action_log(
cls,
log_path: str,
position: int,
state: PrivateSimulationRunState,
) -> int:
"""
Incremental read of private/actions.jsonl from a byte offset.
Same pattern as SimulationRunner._read_action_log (L.683684):
- Seeks to last read position, reads new lines only
- Calls ZepGraphMemoryUpdater.add_activity_from_dict(data, "private")
- Handles round_end and simulation_end event entries
Args:
log_path: Absolute path to private/actions.jsonl.
position: Byte offset of the previous read.
state: Mutable run state to update in place.
Returns:
New byte offset after reading.
"""
graph_memory_enabled = cls._graph_memory_enabled.get(
state.simulation_id, False
)
graph_updater = None
if graph_memory_enabled:
graph_updater = ZepGraphMemoryManager.get_updater(state.simulation_id)
try:
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
# Structured event entries (no agent_id)
if "event_type" in data:
event_type = data["event_type"]
if event_type == "simulation_end":
state.private_completed = True
state.private_running = False
state.runner_status = PrivateRunnerStatus.COMPLETED
state.completed_at = datetime.now().isoformat()
logger.info(
f"[PRIVATE] simulation_end received: "
f"{state.simulation_id}, "
f"total_rounds={data.get('total_rounds')}, "
f"total_actions={data.get('total_actions')}"
)
elif event_type == "round_end":
round_num = data.get("round", 0)
if round_num > state.private_current_round:
state.private_current_round = round_num
# simulated_day may be written by run_private_simulation.py
simulated_day = data.get("simulated_day", 0)
if simulated_day > state.private_simulated_days:
state.private_simulated_days = simulated_day
continue
# Skip non-agent entries
if "agent_id" not in data:
continue
action = PrivateAgentAction(
round_num=data.get("round", 0),
timestamp=data.get(
"timestamp", datetime.now().isoformat()
),
agent_id=data.get("agent_id", 0),
agent_name=data.get("agent_name", ""),
action_type=data.get("action_type", ""),
action_args=data.get("action_args", {}),
result=data.get("result"),
success=data.get("success", True),
)
state.add_action(action)
if action.round_num > state.private_current_round:
state.private_current_round = action.round_num
# Push to Zep graph memory with platform="private"
if graph_updater:
graph_updater.add_activity_from_dict(data, "private")
except json.JSONDecodeError:
pass
return f.tell()
except Exception as e:
logger.warning(
f"[PRIVATE] Failed to read action log: {log_path}, {e}"
)
return position
# ── Internal: persistence ─────────────────────────────────────────────────
@classmethod
def _save_run_state(cls, state: PrivateSimulationRunState) -> None:
"""Persist run state to run_state.json and update in-memory cache."""
sim_dir = os.path.join(cls.RUN_STATE_DIR, state.simulation_id)
os.makedirs(sim_dir, exist_ok=True)
state_file = os.path.join(sim_dir, "run_state.json")
with open(state_file, 'w', encoding='utf-8') as f:
json.dump(state.to_detail_dict(), f, ensure_ascii=False, indent=2)
cls._run_states[state.simulation_id] = state
@classmethod
def _load_run_state(
cls, simulation_id: str
) -> Optional[PrivateSimulationRunState]:
"""
Load run state from disk.
Same pattern as SimulationRunner._load_run_state.
Args:
simulation_id: Simulation identifier.
Returns:
PrivateSimulationRunState or None on failure / missing file.
"""
state_file = os.path.join(
cls.RUN_STATE_DIR, simulation_id, "run_state.json"
)
if not os.path.exists(state_file):
return None
try:
with open(state_file, 'r', encoding='utf-8') as f:
data = json.load(f)
state = PrivateSimulationRunState(
simulation_id=simulation_id,
runner_status=PrivateRunnerStatus(
data.get("runner_status", "idle")
),
private_current_round=data.get("private_current_round", 0),
private_total_rounds=data.get("private_total_rounds", 0),
private_simulated_days=data.get("private_simulated_days", 0),
private_total_days=data.get("private_total_days", 0),
private_running=data.get("private_running", False),
private_actions_count=data.get("private_actions_count", 0),
private_completed=data.get("private_completed", False),
private_error=data.get("private_error"),
started_at=data.get("started_at"),
updated_at=data.get("updated_at", datetime.now().isoformat()),
completed_at=data.get("completed_at"),
process_pid=data.get("process_pid"),
)
for a in data.get("recent_actions", []):
state.recent_actions.append(PrivateAgentAction(
round_num=a.get("round_num", 0),
timestamp=a.get("timestamp", ""),
agent_id=a.get("agent_id", 0),
agent_name=a.get("agent_name", ""),
action_type=a.get("action_type", ""),
action_args=a.get("action_args", {}),
result=a.get("result"),
success=a.get("success", True),
))
cls._run_states[simulation_id] = state
return state
except Exception as e:
logger.error(
f"[PRIVATE] Failed to load run state: {simulation_id}, {e}"
)
return None
@classmethod
def _read_actions_from_file(
cls,
file_path: str,
agent_id: Optional[int] = None,
round_num: Optional[int] = None,
) -> List[PrivateAgentAction]:
"""
Read all agent actions from a JSONL file with optional filters.
Args:
file_path: Path to actions.jsonl.
agent_id: Optional filter by agent ID.
round_num: Optional filter by round number.
Returns:
List of PrivateAgentAction instances.
"""
if not os.path.exists(file_path):
return []
actions: List[PrivateAgentAction] = []
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
if "event_type" in data:
continue
if "agent_id" not in data:
continue
if agent_id is not None and data.get("agent_id") != agent_id:
continue
if round_num is not None and data.get("round") != round_num:
continue
actions.append(PrivateAgentAction(
round_num=data.get("round", 0),
timestamp=data.get("timestamp", ""),
agent_id=data.get("agent_id", 0),
agent_name=data.get("agent_name", ""),
action_type=data.get("action_type", ""),
action_args=data.get("action_args", {}),
result=data.get("result"),
success=data.get("success", True),
))
except json.JSONDecodeError:
continue
return actions
# ── Internal: process management ──────────────────────────────────────────
@classmethod
def _terminate_process(
cls,
process: subprocess.Popen,
simulation_id: str,
timeout: int = 10,
) -> None:
"""
Terminate subprocess and its children cross-platform.
Same implementation as SimulationRunner._terminate_process:
- Windows: taskkill /PID /T, then /F if unresponsive
- Unix: SIGTERM to process group, SIGKILL on timeout
Args:
process: Subprocess to terminate.
simulation_id: Simulation ID for logging.
timeout: Seconds to wait before force-killing.
"""
if IS_WINDOWS:
logger.info(
f"[PRIVATE] Terminating process tree (Windows): "
f"simulation={simulation_id}, pid={process.pid}"
)
try:
subprocess.run(
['taskkill', '/PID', str(process.pid), '/T'],
capture_output=True, timeout=5
)
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
logger.warning(
f"[PRIVATE] Process unresponsive, force killing: "
f"{simulation_id}"
)
subprocess.run(
['taskkill', '/F', '/PID', str(process.pid), '/T'],
capture_output=True, timeout=5
)
process.wait(timeout=5)
except Exception as e:
logger.warning(f"[PRIVATE] taskkill failed, falling back: {e}")
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
else:
pgid = os.getpgid(process.pid)
logger.info(
f"[PRIVATE] Terminating process group (Unix): "
f"simulation={simulation_id}, pgid={pgid}"
)
os.killpg(pgid, signal.SIGTERM)
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
logger.warning(
f"[PRIVATE] Process group unresponsive, force killing: "
f"{simulation_id}"
)
os.killpg(pgid, signal.SIGKILL)
process.wait(timeout=5)

View File

@ -125,7 +125,14 @@ class SimulationRunState:
# 平台完成状态(通过检测 actions.jsonl 中的 simulation_end 事件)
twitter_completed: bool = False
reddit_completed: bool = False
# Private Impact platform state
private_current_round: int = 0
private_simulated_days: int = 0
private_running: bool = False
private_actions_count: int = 0
private_completed: bool = False
# 每轮摘要
rounds: List[RoundSummary] = field(default_factory=list)
@ -152,6 +159,8 @@ class SimulationRunState:
if action.platform == "twitter":
self.twitter_actions_count += 1
elif action.platform == "private":
self.private_actions_count += 1
else:
self.reddit_actions_count += 1
@ -177,7 +186,12 @@ class SimulationRunState:
"reddit_completed": self.reddit_completed,
"twitter_actions_count": self.twitter_actions_count,
"reddit_actions_count": self.reddit_actions_count,
"total_actions_count": self.twitter_actions_count + self.reddit_actions_count,
"private_current_round": self.private_current_round,
"private_simulated_days": self.private_simulated_days,
"private_running": self.private_running,
"private_completed": self.private_completed,
"private_actions_count": self.private_actions_count,
"total_actions_count": self.twitter_actions_count + self.reddit_actions_count + self.private_actions_count,
"started_at": self.started_at,
"updated_at": self.updated_at,
"completed_at": self.completed_at,
@ -268,6 +282,11 @@ class SimulationRunner:
reddit_completed=data.get("reddit_completed", False),
twitter_actions_count=data.get("twitter_actions_count", 0),
reddit_actions_count=data.get("reddit_actions_count", 0),
private_current_round=data.get("private_current_round", 0),
private_simulated_days=data.get("private_simulated_days", 0),
private_running=data.get("private_running", False),
private_completed=data.get("private_completed", False),
private_actions_count=data.get("private_actions_count", 0),
started_at=data.get("started_at"),
updated_at=data.get("updated_at", datetime.now().isoformat()),
completed_at=data.get("completed_at"),
@ -391,6 +410,9 @@ class SimulationRunner:
elif platform == "reddit":
script_name = "run_reddit_simulation.py"
state.reddit_running = True
elif platform == "private":
script_name = "run_private_simulation.py"
state.private_running = True
else:
script_name = "run_parallel_simulation.py"
state.twitter_running = True
@ -487,15 +509,17 @@ class SimulationRunner:
# 新的日志结构:分平台的动作日志
twitter_actions_log = os.path.join(sim_dir, "twitter", "actions.jsonl")
reddit_actions_log = os.path.join(sim_dir, "reddit", "actions.jsonl")
private_actions_log = os.path.join(sim_dir, "private", "actions.jsonl")
process = cls._processes.get(simulation_id)
state = cls.get_run_state(simulation_id)
if not process or not state:
return
twitter_position = 0
reddit_position = 0
private_position = 0
try:
while process.poll() is None: # 进程仍在运行
@ -510,7 +534,13 @@ class SimulationRunner:
reddit_position = cls._read_action_log(
reddit_actions_log, reddit_position, state, "reddit"
)
# 读取 Private 动作日志
if os.path.exists(private_actions_log):
private_position = cls._read_action_log(
private_actions_log, private_position, state, "private"
)
# 更新状态
cls._save_run_state(state)
time.sleep(2)
@ -520,6 +550,8 @@ class SimulationRunner:
cls._read_action_log(twitter_actions_log, twitter_position, state, "twitter")
if os.path.exists(reddit_actions_log):
cls._read_action_log(reddit_actions_log, reddit_position, state, "reddit")
if os.path.exists(private_actions_log):
cls._read_action_log(private_actions_log, private_position, state, "private")
# 进程结束
exit_code = process.returncode
@ -544,8 +576,9 @@ class SimulationRunner:
state.twitter_running = False
state.reddit_running = False
state.private_running = False
cls._save_run_state(state)
except Exception as e:
logger.error(f"监控线程异常: {simulation_id}, error={str(e)}")
state.runner_status = RunnerStatus.FAILED
@ -629,6 +662,10 @@ class SimulationRunner:
state.reddit_completed = True
state.reddit_running = False
logger.info(f"Reddit 模拟已完成: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}")
elif platform == "private":
state.private_completed = True
state.private_running = False
logger.info(f"Private 模拟已完成: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}")
# 检查是否所有启用的平台都已完成
# 如果只运行了一个平台,只检查那个平台
@ -653,7 +690,13 @@ class SimulationRunner:
if round_num > state.reddit_current_round:
state.reddit_current_round = round_num
state.reddit_simulated_hours = simulated_hours
elif platform == "private":
if round_num > state.private_current_round:
state.private_current_round = round_num
simulated_day = action_data.get("simulated_day", 0)
if simulated_day > state.private_simulated_days:
state.private_simulated_days = simulated_day
# 总体轮次取两个平台的最大值
if round_num > state.current_round:
state.current_round = round_num
@ -703,19 +746,23 @@ class SimulationRunner:
sim_dir = os.path.join(cls.RUN_STATE_DIR, state.simulation_id)
twitter_log = os.path.join(sim_dir, "twitter", "actions.jsonl")
reddit_log = os.path.join(sim_dir, "reddit", "actions.jsonl")
private_log = os.path.join(sim_dir, "private", "actions.jsonl")
# 检查哪些平台被启用(通过文件是否存在判断)
twitter_enabled = os.path.exists(twitter_log)
reddit_enabled = os.path.exists(reddit_log)
private_enabled = os.path.exists(private_log)
# 如果平台被启用但未完成,则返回 False
if twitter_enabled and not state.twitter_completed:
return False
if reddit_enabled and not state.reddit_completed:
return False
if private_enabled and not state.private_completed:
return False
# 至少有一个平台被启用且已完成
return twitter_enabled or reddit_enabled
return twitter_enabled or reddit_enabled or private_enabled
@classmethod
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
@ -806,9 +853,10 @@ class SimulationRunner:
state.runner_status = RunnerStatus.STOPPED
state.twitter_running = False
state.reddit_running = False
state.private_running = False
state.completed_at = datetime.now().isoformat()
cls._save_run_state(state)
# 停止图谱记忆更新器
if cls._graph_memory_enabled.get(simulation_id, False):
try:
@ -934,7 +982,18 @@ class SimulationRunner:
agent_id=agent_id,
round_num=round_num
))
# 读取 Private 动作文件
private_actions_log = os.path.join(sim_dir, "private", "actions.jsonl")
if not platform or platform == "private":
actions.extend(cls._read_actions_from_file(
private_actions_log,
default_platform="private",
platform_filter=platform,
agent_id=agent_id,
round_num=round_num
))
# 如果分平台文件不存在,尝试读取旧的单一文件格式
if not actions:
actions_log = os.path.join(sim_dir, "actions.jsonl")
@ -1140,11 +1199,12 @@ class SimulationRunner:
"stderr.log",
"twitter_simulation.db", # Twitter 平台数据库
"reddit_simulation.db", # Reddit 平台数据库
"private_simulation.db", # Private Impact 平台数据库
"env_status.json", # 环境状态文件
]
# 要删除的目录列表(包含动作日志)
dirs_to_clean = ["twitter", "reddit"]
dirs_to_clean = ["twitter", "reddit", "private"]
# 删除文件
for filename in files_to_delete:
@ -1236,6 +1296,7 @@ class SimulationRunner:
state.runner_status = RunnerStatus.STOPPED
state.twitter_running = False
state.reddit_running = False
state.private_running = False
state.completed_at = datetime.now().isoformat()
state.error = "服务器关闭,模拟被终止"
cls._save_run_state(state)

View File

@ -38,7 +38,7 @@ def main():
# 获取运行配置
host = os.environ.get('FLASK_HOST', '0.0.0.0')
port = int(os.environ.get('FLASK_PORT', 5001))
port = int(os.environ.get('FLASK_PORT', 9902))
debug = Config.DEBUG
# 启动服务

View File

@ -132,6 +132,7 @@ class SimulationLogManager:
self.simulation_dir = simulation_dir
self.twitter_logger: Optional[PlatformActionLogger] = None
self.reddit_logger: Optional[PlatformActionLogger] = None
self.private_logger: Optional[PlatformActionLogger] = None
self._main_logger: Optional[logging.Logger] = None
# 设置主日志
@ -177,7 +178,13 @@ class SimulationLogManager:
if self.reddit_logger is None:
self.reddit_logger = PlatformActionLogger("reddit", self.simulation_dir)
return self.reddit_logger
def get_private_logger(self) -> PlatformActionLogger:
"""获取 Private Impact 平台日志记录器"""
if self.private_logger is None:
self.private_logger = PlatformActionLogger("private", self.simulation_dir)
return self.private_logger
def log(self, message: str, level: str = "info"):
"""记录主日志"""
if self._main_logger:

View File

@ -0,0 +1,977 @@
"""
Private Impact Simulation Script
Simulates the impact of a private decision in a closed relational network.
Unlike Twitter/Reddit OASIS simulations, this mode has no social media platform:
no echo chamber, no recency weight, no asyncio.gather() across two platforms.
Differences from run_parallel_simulation.py:
- No OASIS env / agent_graph (no Twitter, no Reddit, no PlatformConfig)
- Single relational simulation (no asyncio.gather parallel platforms)
- Relational actions: REACT_PRIVATELY, CONFRONT, COALITION_BUILD,
SILENT_LEAVE, VOCAL_SUPPORT, DO_NOTHING
- Propagation via cascade_influence graph:
distance 1 = direct exposure (initial_posts targets)
distance 2 = cascade via cascade_influence of reacting agents
- Direct LLM calls via camel-ai ChatAgent (no OASIS action loop)
- Output: backend/scripts/private/actions.jsonl (same JSONL format)
- zep_graph_memory_updater.py reused as-is (platform="private")
Usage:
python run_private_simulation.py --config simulation_config.json
python run_private_simulation.py --config simulation_config.json --no-wait
python run_private_simulation.py --config simulation_config.json --max-rounds 10
Log structure:
sim_xxx/
private/
actions.jsonl # Relational network action log
simulation.log # Main simulation process log
run_state.json # Run state (API polling)
"""
# ============================================================
# Windows UTF-8 fix — same as run_parallel_simulation.py
# ============================================================
import sys
import os
if sys.platform == 'win32':
os.environ.setdefault('PYTHONUTF8', '1')
os.environ.setdefault('PYTHONIOENCODING', 'utf-8')
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
import builtins
_original_open = builtins.open
def _utf8_open(file, mode='r', buffering=-1, encoding=None, errors=None,
newline=None, closefd=True, opener=None):
"""Wrap open() to default to UTF-8 for text mode — fixes third-party libs."""
if encoding is None and 'b' not in mode:
encoding = 'utf-8'
return _original_open(file, mode, buffering, encoding, errors,
newline, closefd, opener)
builtins.open = _utf8_open
import argparse
import asyncio
import json
import logging
import random
import signal
from datetime import datetime
from typing import Any, Dict, List, Optional, Set, Tuple
# ── Path setup (same as run_parallel_simulation.py) ──────────────────────────
_scripts_dir = os.path.dirname(os.path.abspath(__file__))
_backend_dir = os.path.abspath(os.path.join(_scripts_dir, '..'))
_project_root = os.path.abspath(os.path.join(_backend_dir, '..'))
sys.path.insert(0, _scripts_dir)
sys.path.insert(0, _backend_dir)
from dotenv import load_dotenv
_env_file = os.path.join(_project_root, '.env')
if os.path.exists(_env_file):
load_dotenv(_env_file)
print(f"Loaded env config: {_env_file}")
else:
_backend_env = os.path.join(_backend_dir, '.env')
if os.path.exists(_backend_env):
load_dotenv(_backend_env)
print(f"Loaded env config: {_backend_env}")
# ── Logging helpers ───────────────────────────────────────────────────────────
class MaxTokensWarningFilter(logging.Filter):
"""Suppress camel-ai max_tokens warnings (intentionally not set)."""
def filter(self, record: logging.LogRecord) -> bool:
if "max_tokens" in record.getMessage() and "Invalid or missing" in record.getMessage():
return False
return True
logging.getLogger().addFilter(MaxTokensWarningFilter())
from action_logger import SimulationLogManager, PlatformActionLogger
try:
from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.messages import BaseMessage
from camel.agents import ChatAgent
except ImportError as e:
print(f"Error: missing dependency {e}")
print("Please install: pip install camel-ai")
sys.exit(1)
# Optional: Zep graph memory updater — same as run_parallel_simulation.py
try:
from app.services.zep_graph_memory_updater import AgentActivity, ZepGraphMemoryUpdater
_ZEP_AVAILABLE = True
except ImportError:
_ZEP_AVAILABLE = False
# ── Global shutdown state ─────────────────────────────────────────────────────
_shutdown_event: Optional[asyncio.Event] = None
_cleanup_done: bool = False
# ── Relational actions — no social media vocabulary ───────────────────────────
# Divergence from run_parallel_simulation.py:
# TWITTER_ACTIONS / REDDIT_ACTIONS → PRIVATE_ACTIONS
PRIVATE_ACTIONS = [
"REACT_PRIVATELY", # Internal reaction — changes agent state, not visible externally
"CONFRONT", # Direct confrontation with the decision maker
"COALITION_BUILD", # Rallies other network agents to a shared reaction
"SILENT_LEAVE", # Progressive disengagement (resignation, client churn...)
"VOCAL_SUPPORT", # Public or private defense of the decision
"DO_NOTHING", # Indifference — absorbs without reacting
]
# IPC constants (same pattern as run_parallel_simulation.py)
IPC_COMMANDS_DIR = "ipc_commands"
IPC_RESPONSES_DIR = "ipc_responses"
ENV_STATUS_FILE = "env_status.json"
class CommandType:
INTERVIEW = "interview"
BATCH_INTERVIEW = "batch_interview"
CLOSE_ENV = "close_env"
# ── Config / model helpers ────────────────────────────────────────────────────
def load_config(config_path: str) -> Dict[str, Any]:
"""Load simulation config JSON."""
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
def create_model(config: Dict[str, Any]):
"""
Create LLM model from environment variables.
Same logic as run_parallel_simulation.py no boost variant needed here
(single simulation, no parallel platforms to distribute load).
"""
llm_api_key = os.environ.get("LLM_API_KEY", "")
llm_base_url = os.environ.get("LLM_BASE_URL", "")
llm_model = os.environ.get("LLM_MODEL_NAME", "") or config.get("llm_model", "gpt-4o-mini")
if llm_api_key:
os.environ["OPENAI_API_KEY"] = llm_api_key
if not os.environ.get("OPENAI_API_KEY"):
raise ValueError("Missing API Key — set LLM_API_KEY in .env")
if llm_base_url:
os.environ["OPENAI_API_BASE_URL"] = llm_base_url
print(f"[Private] model={llm_model}, base_url={llm_base_url[:40] if llm_base_url else 'default'}...")
return ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=llm_model,
)
def get_agent_names_from_config(config: Dict[str, Any]) -> Dict[int, str]:
"""Build agent_id → entity_name mapping from simulation_config."""
agent_names = {}
for cfg in config.get("agent_configs", []):
agent_id = cfg.get("agent_id")
if agent_id is not None:
agent_names[agent_id] = cfg.get("entity_name", f"Agent_{agent_id}")
return agent_names
# ── Relational graph ──────────────────────────────────────────────────────────
def build_relational_graph(agent_configs: List[Dict[str, Any]]) -> Dict[int, List[int]]:
"""
Build the cascade influence graph from agent configs.
Divergence from run_parallel_simulation.py:
No OASIS platform graph this is a relational influence graph where
cascade_influence[agent_id] = [list of agent_ids this agent can expose].
Returns:
{agent_id: [influenced_agent_ids]}
"""
graph: Dict[int, List[int]] = {}
for cfg in agent_configs:
agent_id = cfg.get("agent_id", 0)
graph[agent_id] = cfg.get("cascade_influence", [])
return graph
def get_initial_exposed_agents(config: Dict[str, Any]) -> Set[int]:
"""
Determine distance-1 agents: those directly targeted by initial_posts.
Divergence from run_parallel_simulation.py:
Instead of posting on Twitter/Reddit, the decision maker (poster_agent_id=0)
announces the decision to agents listed in initial_posts targets.
All agent_ids mentioned in initial_posts (excluding the poster) are exposed.
"""
exposed: Set[int] = set()
event_config = config.get("event_config", {})
for post in event_config.get("initial_posts", []):
poster_id = post.get("poster_agent_id", 0)
# All agents except the poster are exposed at distance 1
for cfg in config.get("agent_configs", []):
agent_id = cfg.get("agent_id")
if agent_id is not None and agent_id != poster_id:
exposed.add(agent_id)
return exposed
def get_decision_context(config: Dict[str, Any]) -> str:
"""
Extract the triggering decision text from event_config.initial_posts.
Reuses the initial_posts mechanism but changes its semantic:
instead of a Twitter post, it is the private decision announcement.
"""
event_config = config.get("event_config", {})
posts = event_config.get("initial_posts", [])
if posts:
return posts[0].get("content", "A private decision has been made.")
return "A private decision has been made."
# ── Active agent selection ────────────────────────────────────────────────────
def get_active_agents_for_round_private(
agent_configs: List[Dict[str, Any]],
exposed_agents: Set[int],
current_hour: int,
round_num: int,
time_config: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""
Select active agents for this round.
Divergence from run_parallel_simulation.py:
Only exposed agents are eligible (relational propagation gate).
Active hours and activity_level logic is preserved from the original.
Args:
agent_configs: Full list of agent configs.
exposed_agents: Set of agent_ids who have received the decision.
current_hour: Simulated hour (0-23).
round_num: Current round number.
time_config: Time configuration from simulation config.
Returns:
List of agent config dicts for agents that will act this round.
"""
base_min = time_config.get("agents_per_hour_min", 3)
base_max = time_config.get("agents_per_hour_max", 10)
peak_hours = time_config.get("peak_hours", [9, 10, 11, 14, 15, 20, 21, 22])
off_peak_hours = time_config.get("off_peak_hours", [0, 1, 2, 3, 4, 5])
if current_hour in peak_hours:
multiplier = time_config.get("peak_activity_multiplier", 1.5)
elif current_hour in off_peak_hours:
multiplier = time_config.get("off_peak_activity_multiplier", 0.3)
else:
multiplier = 1.0
target_count = int(random.uniform(base_min, base_max) * multiplier)
candidates = []
for cfg in agent_configs:
agent_id = cfg.get("agent_id", 0)
# Only exposed agents can act (relational propagation gate)
if agent_id not in exposed_agents:
continue
active_hours = cfg.get("active_hours", list(range(8, 23)))
if current_hour not in active_hours:
continue
activity_level = cfg.get("activity_level", 0.5)
if random.random() < activity_level:
candidates.append(cfg)
selected = random.sample(candidates, min(target_count, len(candidates))) if candidates else []
return selected
# ── LLM agent decision ────────────────────────────────────────────────────────
async def get_agent_action(
agent_config: Dict[str, Any],
decision_context: str,
network_summary: str,
model: Any,
round_num: int,
) -> Dict[str, Any]:
"""
Query LLM for the agent's relational action in the current round.
Divergence from run_parallel_simulation.py:
No OASIS action loop direct ChatAgent call.
The persona field encodes all behavioral dimensions (relational link,
trust level, financial sensitivity, reaction mode).
LLM reads this context and adapts decisions naturally same pattern
as OASIS where persona is injected as-is into the system prompt.
Args:
agent_config: Agent configuration dict (must contain "persona").
decision_context: The triggering decision text.
network_summary: Recent network activity (last N rounds).
model: camel-ai model instance.
round_num: Current round number.
Returns:
{"action_type": str, "action_args": dict}
"""
persona = agent_config.get("persona", "You are a member of a professional network.")
entity_name = agent_config.get(
"entity_name", f"Agent_{agent_config.get('agent_id', 0)}"
)
actions_list = "\n".join(f"- {a}" for a in PRIVATE_ACTIONS)
system_content = (
f"{persona}\n\n"
f"Available actions (choose exactly one):\n{actions_list}\n\n"
"Respond with valid JSON only, no markdown:\n"
'{"action": "<ACTION>", "reasoning": "<brief explanation in 1-2 sentences>", '
'"target_agents": [<agent_ids if COALITION_BUILD, else empty list>]}'
)
user_content = (
f"Round {round_num}.\n\n"
f"Triggering decision:\n{decision_context}\n\n"
f"Recent network activity:\n{network_summary if network_summary else 'No prior activity.'}\n\n"
"What do you do? Choose one action. Respond in JSON only."
)
def _sync_call() -> Dict[str, Any]:
"""Synchronous LLM call — wrapped in asyncio.to_thread to avoid blocking."""
try:
agent = ChatAgent(
system_message=BaseMessage.make_assistant_message(
role_name=entity_name,
content=system_content,
),
model=model,
)
response = agent.step(
BaseMessage.make_user_message(
role_name="Facilitator",
content=user_content,
)
)
text = response.msg.content.strip()
# Strip markdown code fence if present
if "```" in text:
parts = text.split("```")
text = parts[1] if len(parts) > 1 else parts[0]
if text.startswith("json"):
text = text[4:].strip()
data = json.loads(text)
action_type = str(data.get("action", "DO_NOTHING")).upper()
if action_type not in PRIVATE_ACTIONS:
action_type = "DO_NOTHING"
return {
"action_type": action_type,
"action_args": {
"reasoning": data.get("reasoning", ""),
"target_agents": data.get("target_agents", []),
},
}
except Exception:
return {"action_type": "DO_NOTHING", "action_args": {}}
return await asyncio.to_thread(_sync_call)
# ── Private IPC handler ───────────────────────────────────────────────────────
class PrivateIPCHandler:
"""
IPC command handler for the private impact simulation.
Divergence from ParallelIPCHandler in run_parallel_simulation.py:
Single simulation context (no twitter_env / reddit_env).
Interview responses are generated via direct LLM call.
"""
def __init__(
self,
simulation_dir: str,
agent_configs: List[Dict[str, Any]],
model: Any,
):
self.simulation_dir = simulation_dir
self.agent_configs = {cfg["agent_id"]: cfg for cfg in agent_configs}
self.model = model
self.commands_dir = os.path.join(simulation_dir, IPC_COMMANDS_DIR)
self.responses_dir = os.path.join(simulation_dir, IPC_RESPONSES_DIR)
self.status_file = os.path.join(simulation_dir, ENV_STATUS_FILE)
os.makedirs(self.commands_dir, exist_ok=True)
os.makedirs(self.responses_dir, exist_ok=True)
def update_status(self, status: str) -> None:
"""Write current env status to disk."""
with open(self.status_file, 'w', encoding='utf-8') as f:
json.dump({
"status": status,
"platform": "private",
"timestamp": datetime.now().isoformat(),
}, f, ensure_ascii=False, indent=2)
def poll_command(self) -> Optional[Dict[str, Any]]:
"""Poll for pending IPC commands."""
if not os.path.exists(self.commands_dir):
return None
command_files = sorted(
(os.path.join(self.commands_dir, fn), os.path.getmtime(os.path.join(self.commands_dir, fn)))
for fn in os.listdir(self.commands_dir)
if fn.endswith('.json')
)
for filepath, _ in command_files:
try:
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
continue
return None
def send_response(
self,
command_id: str,
status: str,
result: Optional[Dict] = None,
error: Optional[str] = None,
) -> None:
"""Write response file and delete the command file."""
response = {
"command_id": command_id,
"status": status,
"result": result,
"error": error,
"timestamp": datetime.now().isoformat(),
}
response_file = os.path.join(self.responses_dir, f"{command_id}.json")
with open(response_file, 'w', encoding='utf-8') as f:
json.dump(response, f, ensure_ascii=False, indent=2)
command_file = os.path.join(self.commands_dir, f"{command_id}.json")
try:
os.remove(command_file)
except OSError:
pass
async def handle_interview(
self,
command_id: str,
agent_id: int,
prompt: str,
) -> bool:
"""Interview an agent via direct LLM call."""
cfg = self.agent_configs.get(agent_id)
if not cfg:
self.send_response(command_id, "failed", error=f"Agent {agent_id} not found")
return False
persona = cfg.get("persona", "")
entity_name = cfg.get("entity_name", f"Agent_{agent_id}")
def _sync_interview() -> str:
agent = ChatAgent(
system_message=BaseMessage.make_assistant_message(
role_name=entity_name,
content=persona,
),
model=self.model,
)
response = agent.step(
BaseMessage.make_user_message(role_name="Interviewer", content=prompt)
)
return response.msg.content
try:
answer = await asyncio.to_thread(_sync_interview)
self.send_response(command_id, "completed", result={
"agent_id": agent_id,
"agent_name": entity_name,
"platform": "private",
"prompt": prompt,
"response": answer,
"timestamp": datetime.now().isoformat(),
})
print(f" Interview done: agent_id={agent_id}")
return True
except Exception as e:
self.send_response(command_id, "failed", error=str(e))
return False
async def handle_batch_interview(
self,
command_id: str,
interviews: List[Dict[str, Any]],
) -> bool:
"""Batch interview multiple agents."""
tasks = [
self.handle_interview(
f"{command_id}_{i}",
item.get("agent_id", 0),
item.get("prompt", ""),
)
for i, item in enumerate(interviews)
]
results_flags = await asyncio.gather(*tasks)
success_count = sum(results_flags)
if success_count > 0:
self.send_response(command_id, "completed", result={
"interviews_count": success_count,
})
return True
self.send_response(command_id, "failed", error="All interviews failed")
return False
async def process_commands(self) -> bool:
"""
Process pending IPC commands.
Returns:
True to keep running, False to exit.
"""
command = self.poll_command()
if not command:
return True
command_id = command.get("command_id")
command_type = command.get("command_type")
args = command.get("args", {})
print(f"\nIPC command received: {command_type}, id={command_id}")
if command_type == CommandType.INTERVIEW:
await self.handle_interview(
command_id,
args.get("agent_id", 0),
args.get("prompt", ""),
)
return True
if command_type == CommandType.BATCH_INTERVIEW:
await self.handle_batch_interview(
command_id,
args.get("interviews", []),
)
return True
if command_type == CommandType.CLOSE_ENV:
print("Close env command received")
self.send_response(command_id, "completed", result={"message": "Environment closing"})
return False
self.send_response(command_id, "failed", error=f"Unknown command: {command_type}")
return True
# ── Main simulation coroutine ─────────────────────────────────────────────────
async def run_private_simulation(
config: Dict[str, Any],
simulation_dir: str,
action_logger: Optional[PlatformActionLogger] = None,
main_logger: Optional[SimulationLogManager] = None,
max_rounds: Optional[int] = None,
zep_updater: Optional[Any] = None,
) -> Tuple[int, List[Dict[str, Any]]]:
"""
Run the private impact simulation.
Divergence from run_twitter_simulation / run_reddit_simulation:
- No OASIS env direct LLM calls per agent per round
- No PlatformConfig (no recency_weight, no echo_chamber)
- Relational graph drives exposure propagation
- REACT_PRIVATELY does NOT cascade (internal reaction, invisible)
- All other non-DO_NOTHING actions cascade to cascade_influence targets
Args:
config: Simulation config dict.
simulation_dir: Directory for log output.
action_logger: PlatformActionLogger instance ("private" platform).
main_logger: SimulationLogManager for main simulation.log.
max_rounds: Optional round cap.
zep_updater: Optional ZepGraphMemoryUpdater (reused as-is).
Returns:
(total_actions, agent_configs) for IPC handler initialisation.
"""
def log(msg: str) -> None:
if main_logger:
main_logger.info(f"[Private] {msg}")
print(f"[Private] {msg}")
log("Initializing...")
agent_configs = config.get("agent_configs", [])
agent_names = get_agent_names_from_config(config)
time_config = config.get("time_config", {})
# Build relational cascade graph
relational_graph = build_relational_graph(agent_configs)
# Agents exposed to the decision at simulation start (distance 1)
exposed_agents: Set[int] = get_initial_exposed_agents(config)
log(f"Distance-1 exposed agents: {sorted(exposed_agents)}")
# The triggering decision text (from event_config.initial_posts)
decision_context = get_decision_context(config)
log(f"Decision context: {decision_context[:100]}...")
model = create_model(config)
if action_logger:
action_logger.log_simulation_start(config)
total_actions = 0
# Round 0 — log the decision injection (mirrors initial_posts in OASIS)
if action_logger:
action_logger.log_round_start(0, 0)
event_config = config.get("event_config", {})
initial_posts = event_config.get("initial_posts", [])
initial_count = 0
for post in initial_posts:
poster_id = post.get("poster_agent_id", 0)
content = post.get("content", "")
poster_name = agent_names.get(poster_id, f"Agent_{poster_id}")
if action_logger:
action_logger.log_action(
round_num=0,
agent_id=poster_id,
agent_name=poster_name,
action_type="CREATE_POST",
action_args={"content": content},
)
total_actions += 1
initial_count += 1
if zep_updater and _ZEP_AVAILABLE:
zep_updater.add_activity_from_dict({
"agent_id": poster_id,
"agent_name": poster_name,
"action_type": "CREATE_POST",
"action_args": {"content": content},
"round": 0,
"timestamp": datetime.now().isoformat(),
}, platform="private")
log(f"Decision injected: {initial_count} initial post(s)")
if action_logger:
action_logger.log_round_end(0, initial_count)
# Compute total rounds
total_hours = time_config.get("total_simulation_hours", 72)
minutes_per_round = time_config.get("minutes_per_round", 30)
total_rounds = (total_hours * 60) // minutes_per_round
if max_rounds is not None and max_rounds > 0:
original_rounds = total_rounds
total_rounds = min(total_rounds, max_rounds)
if total_rounds < original_rounds:
log(f"Rounds capped: {original_rounds}{total_rounds} (max_rounds={max_rounds})")
# Rolling network activity log for LLM context (last 10 visible actions)
network_log: List[str] = []
start_time = datetime.now()
for round_num in range(total_rounds):
if _shutdown_event and _shutdown_event.is_set():
log(f"Shutdown signal received, stopping at round {round_num + 1}")
break
simulated_minutes = round_num * minutes_per_round
simulated_hour = (simulated_minutes // 60) % 24
simulated_day = simulated_minutes // (60 * 24) + 1
active_cfgs = get_active_agents_for_round_private(
agent_configs, exposed_agents, simulated_hour, round_num, time_config
)
if action_logger:
action_logger.log_round_start(round_num + 1, simulated_hour)
if not active_cfgs:
if action_logger:
action_logger.log_round_end(round_num + 1, 0)
continue
# Build context summary for LLM prompts this round
network_summary = "\n".join(network_log[-10:])
# Query all active agents concurrently
tasks = [
get_agent_action(
cfg, decision_context, network_summary, model, round_num + 1
)
for cfg in active_cfgs
]
action_results = await asyncio.gather(*tasks)
round_action_count = 0
newly_exposed: Set[int] = set()
for cfg, result in zip(active_cfgs, action_results):
agent_id = cfg.get("agent_id", 0)
agent_name = agent_names.get(agent_id, f"Agent_{agent_id}")
action_type = result["action_type"]
action_args = result["action_args"]
if action_logger:
action_logger.log_action(
round_num=round_num + 1,
agent_id=agent_id,
agent_name=agent_name,
action_type=action_type,
action_args=action_args,
)
total_actions += 1
round_action_count += 1
if zep_updater and _ZEP_AVAILABLE:
zep_updater.add_activity_from_dict({
"agent_id": agent_id,
"agent_name": agent_name,
"action_type": action_type,
"action_args": action_args,
"round": round_num + 1,
"timestamp": datetime.now().isoformat(),
}, platform="private")
# Rolling network log — only visible actions update context
if action_type not in ("DO_NOTHING",):
reasoning = action_args.get("reasoning", "")
entry = f"[Round {round_num + 1}] {agent_name}: {action_type}"
if reasoning:
entry += f"{reasoning[:80]}"
network_log.append(entry)
# Propagate exposure via cascade_influence.
# REACT_PRIVATELY is invisible externally — does NOT cascade.
# All other non-DO_NOTHING actions cascade to influenced agents.
if action_type not in ("DO_NOTHING", "REACT_PRIVATELY"):
for influenced_id in relational_graph.get(agent_id, []):
if influenced_id not in exposed_agents:
newly_exposed.add(influenced_id)
influenced_name = agent_names.get(influenced_id, f"Agent_{influenced_id}")
log(f"{influenced_name} exposed via {agent_name}'s {action_type}")
# Expand exposed set with newly reached agents
exposed_agents.update(newly_exposed)
if action_logger:
action_logger.log_round_end(round_num + 1, round_action_count)
if (round_num + 1) % 20 == 0:
progress = (round_num + 1) / total_rounds * 100
log(
f"Day {simulated_day}, {simulated_hour:02d}:00 "
f"— Round {round_num + 1}/{total_rounds} ({progress:.1f}%) "
f"| Exposed: {len(exposed_agents)}/{len(agent_configs)}"
)
if action_logger:
action_logger.log_simulation_end(total_rounds, total_actions)
elapsed = (datetime.now() - start_time).total_seconds()
log(f"Simulation complete! Elapsed: {elapsed:.1f}s, Total actions: {total_actions}")
return total_actions, agent_configs
# ── Entry point ───────────────────────────────────────────────────────────────
async def main() -> None:
parser = argparse.ArgumentParser(description='Private Impact Simulation')
parser.add_argument(
'--config',
type=str,
required=True,
help='Path to simulation_config.json',
)
parser.add_argument(
'--max-rounds',
type=int,
default=None,
help='Maximum number of simulation rounds (optional cap)',
)
parser.add_argument(
'--no-wait',
action='store_true',
default=False,
help='Close environment immediately after simulation (no IPC wait)',
)
args = parser.parse_args()
global _shutdown_event
_shutdown_event = asyncio.Event()
if not os.path.exists(args.config):
print(f"Error: config file not found: {args.config}")
sys.exit(1)
config = load_config(args.config)
simulation_dir = os.path.dirname(args.config) or "."
wait_for_commands = not args.no_wait
# Suppress OASIS loggers (precaution if imported transitively)
for logger_name in ("social.agent", "social.twitter", "social.rec", "oasis.env", "table"):
lg = logging.getLogger(logger_name)
lg.setLevel(logging.CRITICAL)
lg.handlers.clear()
lg.propagate = False
log_manager = SimulationLogManager(simulation_dir)
private_logger = log_manager.get_private_logger()
log_manager.info("=" * 60)
log_manager.info("Private Impact Simulation")
log_manager.info(f"Config: {args.config}")
log_manager.info(f"Simulation ID: {config.get('simulation_id', 'unknown')}")
log_manager.info(f"Wait mode: {'enabled' if wait_for_commands else 'disabled'}")
log_manager.info("=" * 60)
time_config = config.get("time_config", {})
total_hours = time_config.get("total_simulation_hours", 72)
minutes_per_round = time_config.get("minutes_per_round", 30)
config_total_rounds = (total_hours * 60) // minutes_per_round
log_manager.info("Simulation parameters:")
log_manager.info(f" - Total simulated duration: {total_hours}h")
log_manager.info(f" - Minutes per round: {minutes_per_round}")
log_manager.info(f" - Config total rounds: {config_total_rounds}")
if args.max_rounds:
log_manager.info(f" - Round cap: {args.max_rounds}")
log_manager.info(f" - Agent count: {len(config.get('agent_configs', []))}")
log_manager.info("Log structure:")
log_manager.info(f" - Main log: simulation.log")
log_manager.info(f" - Private actions: private/actions.jsonl")
log_manager.info("=" * 60)
start_time = datetime.now()
total_actions, agent_configs = await run_private_simulation(
config=config,
simulation_dir=simulation_dir,
action_logger=private_logger,
main_logger=log_manager,
max_rounds=args.max_rounds,
)
total_elapsed = (datetime.now() - start_time).total_seconds()
log_manager.info("=" * 60)
log_manager.info(f"Simulation loop complete! Elapsed: {total_elapsed:.1f}s")
if wait_for_commands:
log_manager.info("")
log_manager.info("=" * 60)
log_manager.info("Waiting for commands — environment active")
log_manager.info("Supported: interview, batch_interview, close_env")
log_manager.info("=" * 60)
model = create_model(config)
ipc_handler = PrivateIPCHandler(
simulation_dir=simulation_dir,
agent_configs=agent_configs,
model=model,
)
ipc_handler.update_status("alive")
try:
while not _shutdown_event.is_set():
should_continue = await ipc_handler.process_commands()
if not should_continue:
break
try:
await asyncio.wait_for(_shutdown_event.wait(), timeout=0.5)
break
except asyncio.TimeoutError:
pass
except KeyboardInterrupt:
print("\nInterrupt received")
except asyncio.CancelledError:
print("\nTask cancelled")
except Exception as e:
print(f"\nCommand processing error: {e}")
log_manager.info("\nShutting down...")
ipc_handler.update_status("stopped")
log_manager.info("=" * 60)
log_manager.info("All done!")
log_manager.info(f" - {os.path.join(simulation_dir, 'simulation.log')}")
log_manager.info(f" - {os.path.join(simulation_dir, 'private', 'actions.jsonl')}")
log_manager.info("=" * 60)
def setup_signal_handlers() -> None:
"""
Register SIGTERM/SIGINT handlers.
Same pattern as run_parallel_simulation.py sets _shutdown_event
instead of calling sys.exit() directly to allow graceful cleanup.
"""
def signal_handler(signum: int, frame: Any) -> None:
global _cleanup_done
sig_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
print(f"\n{sig_name} received, shutting down...")
if not _cleanup_done:
_cleanup_done = True
if _shutdown_event:
_shutdown_event.set()
else:
print("Forced exit...")
sys.exit(1)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
if __name__ == "__main__":
setup_signal_handlers()
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nProgram interrupted")
except SystemExit:
pass
finally:
try:
from multiprocessing import resource_tracker
resource_tracker._resource_tracker._stop()
except Exception:
pass
print("Simulation process exited")

View File

@ -1,14 +0,0 @@
services:
mirofish:
image: ghcr.io/666ghj/mirofish:latest
# 加速镜像(如拉取缓慢可替换上方地址)
# image: ghcr.nju.edu.cn/666ghj/mirofish:latest
container_name: mirofish
env_file:
- .env
ports:
- "3000:3000"
- "5001:5001"
restart: unless-stopped
volumes:
- ./backend/uploads:/app/backend/uploads

View File

@ -1435,7 +1435,6 @@
"resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz",
"integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==",
"license": "ISC",
"peer": true,
"engines": {
"node": ">=12"
}
@ -1913,7 +1912,6 @@
"integrity": "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A==",
"dev": true,
"license": "MIT",
"peer": true,
"engines": {
"node": ">=12"
},
@ -2053,7 +2051,6 @@
"integrity": "sha512-ITcnkFeR3+fI8P1wMgItjGrR10170d8auB4EpMLPqmx6uxElH3a/hHGQabSHKdqd4FXWO1nFIp9rRn7JQ34ACQ==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"esbuild": "^0.25.0",
"fdir": "^6.5.0",
@ -2128,7 +2125,6 @@
"resolved": "https://registry.npmjs.org/vue/-/vue-3.5.25.tgz",
"integrity": "sha512-YLVdgv2K13WJ6n+kD5owehKtEXwdwXuj2TTyJMsO7pSeKw2bfRNZGjhB7YzrpbMYj5b5QsUebHpOqR3R3ziy/g==",
"license": "MIT",
"peer": true,
"dependencies": {
"@vue/compiler-dom": "3.5.25",
"@vue/compiler-sfc": "3.5.25",

View File

@ -3,7 +3,7 @@ import i18n from '../i18n'
// 创建axios实例
const service = axios.create({
baseURL: import.meta.env.VITE_API_BASE_URL || 'http://localhost:5001',
baseURL: import.meta.env.VITE_API_BASE_URL || 'http://localhost:9902',
timeout: 300000, // 5分钟超时本体生成可能需要较长时间
headers: {
'Content-Type': 'application/json'

View File

@ -0,0 +1,22 @@
import service, { requestWithRetry } from './index'
export const preparePrivateSimulation = (data) =>
requestWithRetry(() => service.post('/api/private-impact/prepare', data), 3, 2000)
export const startPrivateSimulation = (data) =>
requestWithRetry(() => service.post('/api/private-impact/start', data), 3, 1000)
export const getPrivateStatus = (simId) =>
service.get(`/api/private-impact/status/${simId}`)
export const stopPrivateSimulation = (simId) =>
service.post(`/api/private-impact/stop/${simId}`)
export const getPrivateActions = (simId, params = {}) =>
service.get(`/api/private-impact/actions/${simId}`, { params })
export const generatePrivateReport = (simId, data = {}) =>
requestWithRetry(() => service.post(`/api/private-impact/report/${simId}`, data), 3, 1000)
export const cleanupPrivateSimulation = (simId) =>
service.delete(`/api/private-impact/cleanup/${simId}`)

View File

@ -0,0 +1,222 @@
<template>
<div class="mode-selector">
<div class="selector-header">
<span class="selector-label">SELECT SIMULATION MODE</span>
<p class="selector-hint">Choose how you want to run your impact analysis</p>
</div>
<div class="mode-cards">
<!-- Public Mode -->
<button
class="mode-card"
:class="{ 'is-selected': selected === 'public' }"
@click="select('public')"
>
<div class="card-icon">
<svg viewBox="0 0 24 24" width="32" height="32" fill="none" stroke="currentColor" stroke-width="1.5">
<circle cx="12" cy="12" r="10" />
<line x1="2" y1="12" x2="22" y2="12" />
<path d="M12 2a15.3 15.3 0 0 1 4 10 15.3 15.3 0 0 1-4 10 15.3 15.3 0 0 1-4-10 15.3 15.3 0 0 1 4-10z" />
</svg>
</div>
<div class="card-body">
<div class="card-title">Public Opinion</div>
<div class="card-subtitle">Twitter / Reddit</div>
<p class="card-desc">Simulate how a decision, event, or message propagates through open social networks.</p>
<div class="card-tags">
<span class="tag">Social Media</span>
<span class="tag">Public Sentiment</span>
<span class="tag">Virality</span>
</div>
</div>
<div class="card-check">
<svg viewBox="0 0 24 24" width="16" height="16" fill="none" stroke="currentColor" stroke-width="2.5">
<polyline points="20 6 9 17 4 12" />
</svg>
</div>
</button>
<!-- Private Impact Mode -->
<button
class="mode-card mode-card--private"
:class="{ 'is-selected': selected === 'private' }"
@click="select('private')"
>
<div class="card-icon">
<svg viewBox="0 0 24 24" width="32" height="32" fill="none" stroke="currentColor" stroke-width="1.5">
<rect x="3" y="11" width="18" height="11" rx="2" ry="2" />
<path d="M7 11V7a5 5 0 0 1 10 0v4" />
</svg>
</div>
<div class="card-body">
<div class="card-title">Private Impact</div>
<div class="card-subtitle">Closed Relational Network</div>
<p class="card-desc">Simulate how a private decision propagates through a relational network employees, clients, partners.</p>
<div class="card-tags">
<span class="tag">Org Network</span>
<span class="tag">Decision Impact</span>
<span class="tag">Confidential</span>
</div>
</div>
<div class="card-check">
<svg viewBox="0 0 24 24" width="16" height="16" fill="none" stroke="currentColor" stroke-width="2.5">
<polyline points="20 6 9 17 4 12" />
</svg>
</div>
</button>
</div>
</div>
</template>
<script setup>
import { ref } from 'vue'
const emit = defineEmits(['mode-selected'])
const selected = ref(null)
const select = (mode) => {
selected.value = mode
emit('mode-selected', mode)
}
</script>
<style scoped>
.mode-selector {
padding: 0;
}
.selector-header {
margin-bottom: 20px;
}
.selector-label {
display: block;
font-size: 10px;
font-weight: 600;
letter-spacing: 0.12em;
color: #999;
margin-bottom: 6px;
}
.selector-hint {
font-size: 13px;
color: #555;
}
.mode-cards {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 12px;
}
.mode-card {
position: relative;
display: flex;
flex-direction: column;
align-items: flex-start;
text-align: left;
padding: 20px;
border: 1.5px solid #E0E0E0;
border-radius: 4px;
background: #fff;
cursor: pointer;
transition: border-color 0.18s, box-shadow 0.18s, background 0.18s;
width: 100%;
gap: 14px;
}
.mode-card:hover {
border-color: #000;
box-shadow: 0 2px 12px rgba(0, 0, 0, 0.06);
}
.mode-card.is-selected {
border-color: #000;
background: #FAFAFA;
}
.mode-card--private:hover,
.mode-card--private.is-selected {
border-color: #1A1A1A;
background: #F8F8F8;
}
.card-icon {
color: #333;
flex-shrink: 0;
}
.card-body {
flex: 1;
min-width: 0;
}
.card-title {
font-size: 14px;
font-weight: 700;
color: #000;
margin-bottom: 2px;
letter-spacing: 0.02em;
}
.card-subtitle {
font-size: 10px;
font-weight: 600;
letter-spacing: 0.1em;
color: #888;
text-transform: uppercase;
margin-bottom: 8px;
}
.card-desc {
font-size: 12px;
color: #555;
line-height: 1.5;
margin-bottom: 10px;
}
.card-tags {
display: flex;
flex-wrap: wrap;
gap: 4px;
}
.tag {
font-size: 10px;
font-weight: 500;
color: #666;
background: #F0F0F0;
border-radius: 2px;
padding: 2px 6px;
letter-spacing: 0.04em;
}
.card-check {
position: absolute;
top: 12px;
right: 12px;
width: 20px;
height: 20px;
border-radius: 50%;
background: #000;
color: #fff;
display: flex;
align-items: center;
justify-content: center;
opacity: 0;
transform: scale(0.7);
transition: opacity 0.15s, transform 0.15s;
}
.mode-card.is-selected .card-check {
opacity: 1;
transform: scale(1);
}
@media (max-width: 640px) {
.mode-cards {
grid-template-columns: 1fr;
}
}
</style>

View File

@ -5,6 +5,7 @@ import SimulationView from '../views/SimulationView.vue'
import SimulationRunView from '../views/SimulationRunView.vue'
import ReportView from '../views/ReportView.vue'
import InteractionView from '../views/InteractionView.vue'
import PrivateImpactView from '../views/PrivateImpactView.vue'
const routes = [
{
@ -41,6 +42,12 @@ const routes = [
name: 'Interaction',
component: InteractionView,
props: true
},
{
path: '/private/:projectId',
name: 'PrivateImpact',
component: PrivateImpactView,
props: true
}
]

View File

@ -125,6 +125,9 @@
<!-- 右栏交互控制台 -->
<div class="right-panel">
<div class="mode-selector-wrapper">
<ModeSelector @mode-selected="handleModeSelected" />
</div>
<div class="console-box">
<!-- 上传区域 -->
<div class="console-section">
@ -216,9 +219,22 @@ import { ref, computed } from 'vue'
import { useRouter } from 'vue-router'
import HistoryDatabase from '../components/HistoryDatabase.vue'
import LanguageSwitcher from '../components/LanguageSwitcher.vue'
import ModeSelector from '../components/ModeSelector.vue'
const router = useRouter()
// Mode sélectionné (public | private)
const selectedMode = ref(null)
const handleModeSelected = (mode) => {
selectedMode.value = mode
if (mode === 'private') {
sessionStorage.setItem('pendingSimMode', 'private')
} else {
sessionStorage.removeItem('pendingSimMode')
}
}
//
const formData = ref({
simulationRequirement: ''
@ -672,6 +688,10 @@ const startSimulation = () => {
flex: 1.2;
}
.mode-selector-wrapper {
margin-bottom: 20px;
}
.console-box {
border: 1px solid #CCC; /* 外部实线 */
padding: 8px; /* 内边距形成双重边框感 */

View File

@ -215,6 +215,13 @@ const handleNewProject = async () => {
currentProjectId.value = res.data.project_id
projectData.value = res.data
const pendingMode = sessionStorage.getItem('pendingSimMode')
sessionStorage.removeItem('pendingSimMode')
if (pendingMode === 'private') {
router.push(`/private/${res.data.project_id}`)
return
}
router.replace({ name: 'Process', params: { projectId: res.data.project_id } })
ontologyProgress.value = null
addLog(`Ontology generated successfully for project ${res.data.project_id}`)

File diff suppressed because it is too large Load Diff

View File

@ -12,11 +12,11 @@ export default defineConfig({
}
},
server: {
port: 3000,
port: 9901,
open: true,
proxy: {
'/api': {
target: 'http://localhost:5001',
target: 'http://localhost:9902',
changeOrigin: true,
secure: false
}