From 89c436411ba5989d2ae6200f573f45ce166482ed Mon Sep 17 00:00:00 2001 From: Cyril Date: Thu, 16 Apr 2026 15:04:22 +0200 Subject: [PATCH] feat: add Private Impact simulation mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New simulation mode for private decision impact in closed relational networks - RelationalAgentProfile extending OasisAgentProfile with 8 relational fields - run_private_simulation.py — standalone engine without social platform - PrivateImpactProfileGenerator, PrivateImpactConfigGenerator, PrivateImpactRunner - Blueprint /api/private-impact with 7 endpoints - ModeSelector.vue + PrivateImpactView.vue (5-step wizard) - 6 surgical extensions in simulation_runner.py - Zero breaking changes to existing public mode --- .dockerignore | 23 - CONTEXT.md | 131 ++ Dockerfile | 29 - backend/app/__init__.py | 2 + backend/app/api/__init__.py | 2 + backend/app/api/private.py | 558 ++++++ .../private_impact_config_generator.py | 735 ++++++++ .../private_impact_profile_generator.py | 823 +++++++++ backend/app/services/private_impact_runner.py | 903 ++++++++++ backend/app/services/simulation_runner.py | 93 +- backend/run.py | 2 +- backend/scripts/action_logger.py | 9 +- backend/scripts/run_private_simulation.py | 977 +++++++++++ docker-compose.yml | 14 - frontend/package-lock.json | 4 - frontend/src/api/index.js | 2 +- frontend/src/api/private.js | 22 + frontend/src/components/ModeSelector.vue | 222 +++ frontend/src/router/index.js | 7 + frontend/src/views/Home.vue | 20 + frontend/src/views/MainView.vue | 7 + frontend/src/views/PrivateImpactView.vue | 1554 +++++++++++++++++ frontend/vite.config.js | 4 +- 23 files changed, 6052 insertions(+), 91 deletions(-) delete mode 100644 .dockerignore create mode 100644 CONTEXT.md delete mode 100644 Dockerfile create mode 100644 backend/app/api/private.py create mode 100644 backend/app/services/private_impact_config_generator.py create mode 100644 backend/app/services/private_impact_profile_generator.py create mode 100644 backend/app/services/private_impact_runner.py create mode 100644 backend/scripts/run_private_simulation.py delete mode 100644 docker-compose.yml create mode 100644 frontend/src/api/private.js create mode 100644 frontend/src/components/ModeSelector.vue create mode 100644 frontend/src/views/PrivateImpactView.vue diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index 0e84a5ac..00000000 --- a/.dockerignore +++ /dev/null @@ -1,23 +0,0 @@ -.git -.github -.gitignore -.cursor -.DS_Store -.env - -node_modules -frontend/node_modules -backend/.venv -.venv -.python-version - -__pycache__ -*.pyc -.pytest_cache -.mypy_cache -.ruff_cache - -frontend/dist -frontend/.vite - -backend/uploads diff --git a/CONTEXT.md b/CONTEXT.md new file mode 100644 index 00000000..1c7e3e9f --- /dev/null +++ b/CONTEXT.md @@ -0,0 +1,131 @@ +# MiroFish — Fork Context (Private Impact Feature) + +## Branche de travail +`feature/private-impact` + +## Remotes +- `origin` → https://github.com/CyrilDEVIA/MiroResult.git (fork perso) +- `upstream` → https://github.com/666ghj/MiroFish.git (repo original) + +--- + +## Historique des sessions + +### 2026-04-16 — Session 1 + +#### Étapes terminées +- [x] **Prompt N°01** — Lecture complète du code source (audit, zéro modification) +- [x] **Prompt N°02** — Setup Git : fork + remote + branche `feature/private-impact` +- [x] **Prompt N°03** — Création `backend/scripts/run_private_simulation.py` +- [x] **Prompt N°04** — Création `backend/app/services/private_impact_profile_generator.py` +- [x] **Prompt N°05** — Création `backend/app/services/private_impact_config_generator.py` +- [x] **Prompt N°06** — Création `backend/app/services/private_impact_runner.py` +- [x] **Prompt N°07** — Blueprint `backend/app/api/private.py` + enregistrement (`api/__init__.py`, `app/__init__.py`) +- [x] **Prompt N°08** — Modification `backend/app/services/simulation_runner.py` (7 zones : champs private_*, start, monitor, read_log, check_completed, get_actions, cleanup) +- [x] **Prompt N°09** — Frontend : `api/private.js` + `ModeSelector.vue` + `PrivateImpactView.vue` + route `/private/:projectId` +- [x] **Prompt N°10** — `action_logger.py` : ajout `get_private_logger()` + suppression fallback `run_private_simulation.py` + intégration `ModeSelector` dans `Home.vue` + +#### Fichiers créés / modifiés +| Fichier | Action | +|---|---| +| `backend/scripts/run_private_simulation.py` | Créé — moteur de simulation privé | +| `backend/scripts/private/` | Créé — répertoire de sortie actions.jsonl | +| `backend/app/services/private_impact_profile_generator.py` | Créé — générateur de profils relationnels | +| `backend/app/services/private_impact_config_generator.py` | Créé — générateur de paramètres comportementaux | +| `backend/app/services/private_impact_runner.py` | Créé — orchestrateur subprocess + monitoring | +| `CONTEXT.md` | Créé — ce fichier | +| `backend/app/api/private.py` | Créé — blueprint Flask /api/private-impact (7 routes) | +| `backend/app/api/__init__.py` | Modifié — ajout private_bp + import private | +| `backend/app/__init__.py` | Modifié — enregistrement private_bp | +| `backend/app/services/simulation_runner.py` | Modifié — 7 zones private (champs, start, monitor, read_log, check, get_actions, cleanup) | +| `frontend/src/api/private.js` | Créé — client API private impact (7 fonctions) | +| `frontend/src/components/ModeSelector.vue` | Créé — sélecteur Public / Private Impact (2 cartes) | +| `frontend/src/views/PrivateImpactView.vue` | Créé — wizard 5 étapes (form → prepare → run → report → chat) | +| `frontend/src/router/index.js` | Modifié — route `/private/:projectId` ajoutée | +| `backend/scripts/action_logger.py` | Modifié — ajout `get_private_logger()` à `SimulationLogManager` | +| `backend/scripts/run_private_simulation.py` | Modifié — suppression fallback `hasattr`, appel direct `log_manager.get_private_logger()` | +| `frontend/src/views/Home.vue` | Modifié — intégration `ModeSelector` (right panel) + `handleModeSelected` + sessionStorage `pendingSimMode` | + +#### Décisions d'architecture prises +- Pas d'env OASIS (pas de Twitter/Reddit/PlatformConfig) +- Appels LLM directs via `camel-ai ChatAgent` + `asyncio.to_thread()` +- Graphe relationnel construit depuis `cascade_influence` dans agent_configs +- `REACT_PRIVATELY` = invisible → ne propage pas l'exposition +- Tous les autres actions (sauf `DO_NOTHING`) cascade vers `cascade_influence` targets +- `zep_graph_memory_updater.py` réutilisé sans modification (platform="private") +- IPC `PrivateIPCHandler` : interviews via LLM direct (pas de SQLite) +- Output : `private/actions.jsonl` (même format JSONL que twitter/reddit) +- `RelationalAgentProfile` hérite de `OasisAgentProfile` — 8 champs relationnels ajoutés +- Encodage des dimensions relationnelles dans le champ `persona` (texte naturel) +- Fallback rule-based par type : Employee, Manager, Client, Competitor, Partner, FamilyMember +- `to_private_format()` retourne le dict lu par `run_private_simulation.py` +- `PrivateImpactConfigGenerator.generate_config()` : entrée = liste de dicts agents (issue de profile_generator), pas d'EntityNode direct +- `PrivateTimeConfig` : jours + rounds/jour (matin/midi/soir) — pas d'heures ni timezone +- `PrivateEventConfig` : injection par `decision_statement` — pas de posts sociaux +- `RelationalActivityConfig.exposure_round` : round 0 = exposition directe (distance 1) +- Fallback rule-based : table `RELATIONAL_FALLBACKS` dans le générateur (6 types) +- `PrivateImpactRunner` : même pattern classmethods que `SimulationRunner` (états en mémoire de classe) +- Config lue depuis `private_simulation_config.json` (≠ `simulation_config.json` OASIS) +- Log unique : `{sim_dir}/private/actions.jsonl` (une seule plateforme) +- `PrivateRunnerStatus` : enum séparé — pas de réutilisation de `RunnerStatus` +- `private_simulated_days` lu depuis le champ `simulated_day` du `round_end` event +- Frontend : CSS plain (pas Tailwind — non présent dans package.json) — même style que les vues existantes +- `ModeSelector.vue` : composant standalone, émet `@mode-selected` avec `"public"` ou `"private"`, à intégrer manuellement dans `Home.vue` ou `Process.vue` +- `PrivateImpactView.vue` : route `/private/:projectId` — charge le projet via `getProject()` pour récupérer `graph_id` +- Step 3 : polling `/api/private-impact/status/{simId}` toutes les 3s + affichage `recent_actions` depuis `to_detail_dict()` +- Step 4 : report via `generatePrivateReport()` → task_id → polling `getReportStatus(reportId)` → `getReport(reportId)` (réutilise le ReportManager existant) +- Step 5 : `chatAgents` reconstruit depuis la liste d'actions (agent_id + agent_name) ; chat via `interviewAgents()` (réutilise simulation.js) +- `SimulationRunState` : 5 champs `private_*` ajoutés (current_round, simulated_days, running, actions_count, completed) +- `add_action()` : elif platform=="private" → private_actions_count (évite comptage dans reddit) +- `to_dict()` : private_* inclus dans total_actions_count +- `start_simulation()` : elif platform=="private" → run_private_simulation.py + private_running=True +- `_monitor_simulation()` : lecture `private/actions.jsonl` dans la boucle ET en final ; private_running=False à la fin +- `_read_action_log()` : simulation_end → private_completed=True ; round_end → private_current_round + private_simulated_days (depuis simulated_day) +- `_check_all_platforms_completed()` : private_log + private_enabled + check private_completed +- `get_all_actions()` : bloc private après reddit (même pattern) +- `cleanup_simulation_logs()` : private_simulation.db + dirs_to_clean inclut "private" +- Blueprint `private_bp` enregistré sans url_prefix (les routes déclarent `/api/private-impact/...` en entier) +- `/prepare` stocke les métadonnées (graph_id, sim_requirement, agent_count…) dans `private_meta.json` dans le sim_dir +- `/prepare` appelle `ZepEntityReader.get_entities_by_type()` en boucle sur les types relationnels puis `PrivateImpactProfileGenerator.generate_profiles_from_entities()` +- `/start` lit `private_simulation_config.json` via `PrivateImpactRunner.start_simulation()` +- `/status` retourne `to_detail_dict()` (inclut `recent_actions`) +- `/report` réutilise `ReportAgent` avec `simulation_id=sim_id` et `graph_id` lu depuis `private_meta.json` +- `/cleanup` délègue entièrement à `PrivateImpactRunner.cleanup()` +- `get_private_logger()` ajouté à `SimulationLogManager` — même pattern que `get_twitter_logger()` / `get_reddit_logger()` ; fallback supprimé dans `run_private_simulation.py` +- `ModeSelector` intégré dans `Home.vue` (right panel, au-dessus de `.console-box`) ; mode stocké dans `sessionStorage` (`pendingSimMode`) — `MainView.vue` (N°11) doit lire ce flag et rediriger vers `/private/:projectId` après création du projet + +--- + +## Prochaines étapes + +| Prompt | Fichier cible | Action | +|---|---|---| +| N°04 | `backend/app/services/private_impact_profile_generator.py` | ✅ Terminé | +| N°05 | `backend/app/services/private_impact_config_generator.py` | ✅ Terminé | +| N°06 | `backend/app/services/private_impact_runner.py` | ✅ Terminé | +| N°07 | `backend/app/api/private.py` | ✅ Terminé | +| N°07 | `backend/app/api/__init__.py` | ✅ Terminé | +| N°07 | `backend/app/__init__.py` | ✅ Terminé | +| N°08 | `backend/app/services/simulation_runner.py` | ✅ Terminé | +| N°09 | `frontend/src/views/PrivateImpactView.vue` | ✅ Terminé | +| N°09 | `frontend/src/components/ModeSelector.vue` | ✅ Terminé | +| N°10 | `backend/scripts/action_logger.py` | ✅ Terminé — `get_private_logger()` ajouté | +| N°10 | `backend/scripts/run_private_simulation.py` | ✅ Terminé — fallback supprimé | +| N°10 | `frontend/src/views/Home.vue` | ✅ Terminé — ModeSelector intégré | +| N°11 | `frontend/src/views/MainView.vue` | Lire `sessionStorage.pendingSimMode` après création du projet → rediriger vers `/private/:projectId` si 'private' | +| N°11 | Test end-to-end | Préparer → Lancer → Observer actions.jsonl | + +## Point d'attention — `MainView.vue` (N°11) +**Status : ⏳ PENDING** + +`Home.vue` stocke `sessionStorage.pendingSimMode = 'private'` quand l'utilisateur sélectionne Private Impact. +`MainView.vue` doit être modifié pour lire ce flag après la création du projet + le build du graphe Zep, et rediriger vers `/private/:projectId` au lieu de rester sur la vue OASIS standard. + +**Action requise** dans `MainView.vue` — après la séquence upload → create_project → build_graph : +```javascript +const pendingMode = sessionStorage.getItem('pendingSimMode') +if (pendingMode === 'private') { + sessionStorage.removeItem('pendingSimMode') + router.push(`/private/${projectId}`) +} +``` diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index e6564686..00000000 --- a/Dockerfile +++ /dev/null @@ -1,29 +0,0 @@ -FROM python:3.11 - -# 安装 Node.js (满足 >=18)及必要工具 -RUN apt-get update \ - && apt-get install -y --no-install-recommends nodejs npm \ - && rm -rf /var/lib/apt/lists/* - -# 从 uv 官方镜像复制 uv -COPY --from=ghcr.io/astral-sh/uv:0.9.26 /uv /uvx /bin/ - -WORKDIR /app - -# 先复制依赖描述文件以利用缓存 -COPY package.json package-lock.json ./ -COPY frontend/package.json frontend/package-lock.json ./frontend/ -COPY backend/pyproject.toml backend/uv.lock ./backend/ - -# 安装依赖(Node + Python) -RUN npm ci \ - && npm ci --prefix frontend \ - && cd backend && uv sync --frozen - -# 复制项目源码 -COPY . . - -EXPOSE 3000 5001 - -# 同时启动前后端(开发模式) -CMD ["npm", "run", "dev"] \ No newline at end of file diff --git a/backend/app/__init__.py b/backend/app/__init__.py index aba624bb..9e0b04e7 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -67,6 +67,8 @@ def create_app(config_class=Config): app.register_blueprint(graph_bp, url_prefix='/api/graph') app.register_blueprint(simulation_bp, url_prefix='/api/simulation') app.register_blueprint(report_bp, url_prefix='/api/report') + from .api import private_bp + app.register_blueprint(private_bp) # 健康检查 @app.route('/health') diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py index ffda743a..80f0ce2c 100644 --- a/backend/app/api/__init__.py +++ b/backend/app/api/__init__.py @@ -7,8 +7,10 @@ from flask import Blueprint graph_bp = Blueprint('graph', __name__) simulation_bp = Blueprint('simulation', __name__) report_bp = Blueprint('report', __name__) +private_bp = Blueprint('private', __name__) from . import graph # noqa: E402, F401 from . import simulation # noqa: E402, F401 from . import report # noqa: E402, F401 +from . import private # noqa: E402, F401 diff --git a/backend/app/api/private.py b/backend/app/api/private.py new file mode 100644 index 00000000..307c2f03 --- /dev/null +++ b/backend/app/api/private.py @@ -0,0 +1,558 @@ +""" +Private Impact API routes. + +Exposes the /api/private-impact endpoints for the Private Impact simulation mode. +Follows the same error handling and JSON response format as graph/simulation/report blueprints. +""" + +import json +import os +import traceback +import threading +import uuid +from datetime import datetime + +from flask import request, jsonify + +from . import private_bp +from ..config import Config +from ..services.private_impact_profile_generator import PrivateImpactProfileGenerator +from ..services.private_impact_config_generator import PrivateImpactConfigGenerator +from ..services.private_impact_runner import PrivateImpactRunner +from ..services.zep_entity_reader import ZepEntityReader +from ..services.report_agent import ReportAgent, ReportManager, ReportStatus +from ..models.task import TaskManager, TaskStatus +from ..models.project import ProjectManager +from ..utils.logger import get_logger +from ..utils.locale import t, get_locale, set_locale + +logger = get_logger('mirofish.api.private') + +# Simulation data directory (same root as PrivateImpactRunner.RUN_STATE_DIR) +_SIM_DIR = os.path.join(os.path.dirname(__file__), '../../uploads/simulations') + +# Relational entity types recognised by PrivateImpactProfileGenerator +_RELATIONAL_ENTITY_TYPES = [ + "employee", "manager", "client", "competitor", + "partner", "familymember", "colleague", "investor", +] + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + +def _sim_dir(sim_id: str) -> str: + """Return absolute path to the simulation directory.""" + return os.path.join(_SIM_DIR, sim_id) + + +def _meta_path(sim_id: str) -> str: + return os.path.join(_sim_dir(sim_id), "private_meta.json") + + +def _read_meta(sim_id: str) -> dict: + """Load private_meta.json; return empty dict if missing.""" + path = _meta_path(sim_id) + if not os.path.exists(path): + return {} + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + + +def _write_meta(sim_id: str, meta: dict) -> None: + """Persist private_meta.json to disk.""" + os.makedirs(_sim_dir(sim_id), exist_ok=True) + with open(_meta_path(sim_id), 'w', encoding='utf-8') as f: + json.dump(meta, f, ensure_ascii=False, indent=2) + + +# ── POST /api/private-impact/prepare ────────────────────────────────────────── + +@private_bp.route('/private-impact/prepare', methods=['POST']) +def prepare_private_simulation(): + """ + Prepare a Private Impact simulation. + + Reads relational entities from the Zep graph, generates RelationalAgentProfile + instances via PrivateImpactProfileGenerator, then generates the full + PrivateSimulationParameters via PrivateImpactConfigGenerator. + Saves private_agents.json and private_simulation_config.json in the sim dir. + + Request (JSON): + { + "graph_id": "mirofish_xxxx", // Required (or project_id) + "project_id": "proj_xxxx", // Optional — used to resolve graph_id / sim_requirement + "simulation_requirement": "...", // Required if no project_id + "decision_context": "...", // Optional + "use_llm": true, // Optional, default true + "entity_types": ["employee", ...], // Optional filter (defaults to all relational types) + "sim_id": "private_xxxx" // Optional — reuse an existing sim_id + } + + Returns: + { "success": true, "data": { "sim_id": "...", "agent_count": N, "status": "prepared" } } + """ + try: + data = request.get_json() or {} + + # Resolve graph_id and simulation_requirement + project_id = data.get('project_id') + graph_id = data.get('graph_id') + simulation_requirement = data.get('simulation_requirement', '') + decision_context = data.get('decision_context', '') + + if project_id: + project = ProjectManager.get_project(project_id) + if not project: + return jsonify({ + "success": False, + "error": t('api.projectNotFound', id=project_id) + }), 404 + graph_id = graph_id or project.graph_id + simulation_requirement = simulation_requirement or project.simulation_requirement or '' + + if not graph_id: + return jsonify({ + "success": False, + "error": "graph_id is required" + }), 400 + + if not simulation_requirement: + return jsonify({ + "success": False, + "error": "simulation_requirement is required" + }), 400 + + if not Config.ZEP_API_KEY: + return jsonify({ + "success": False, + "error": t('api.zepApiKeyMissing') + }), 500 + + # Create or reuse sim_id + sim_id = data.get('sim_id') or f"private_{uuid.uuid4().hex[:12]}" + os.makedirs(_sim_dir(sim_id), exist_ok=True) + + use_llm = data.get('use_llm', True) + entity_types = data.get('entity_types') or _RELATIONAL_ENTITY_TYPES + + # Read relational entities from Zep + reader = ZepEntityReader() + all_entities = [] + for etype in entity_types: + try: + found = reader.get_entities_by_type( + graph_id=graph_id, + entity_type=etype, + enrich_with_edges=True, + ) + all_entities.extend(found) + logger.info(f"[PRIVATE] {len(found)} '{etype}' entities read") + except Exception as e: + logger.warning(f"[PRIVATE] Could not read '{etype}' entities: {e}") + + if not all_entities: + return jsonify({ + "success": False, + "error": "No relational entities found in the graph for the given entity types." + }), 404 + + # Generate RelationalAgentProfile instances + profile_generator = PrivateImpactProfileGenerator() + profiles_path = os.path.join(_sim_dir(sim_id), "private_agents.json") + profiles = profile_generator.generate_profiles_from_entities( + entities=all_entities, + use_llm=use_llm, + graph_id=graph_id, + realtime_output_path=profiles_path, + ) + + # Serialize profiles for config generator + agent_dicts = [p.to_private_format() for p in profiles] + + # Save profiles file (final) + with open(profiles_path, 'w', encoding='utf-8') as f: + json.dump(agent_dicts, f, ensure_ascii=False, indent=2) + + # Generate PrivateSimulationParameters + config_generator = PrivateImpactConfigGenerator() + params = config_generator.generate_config( + agent_profiles=agent_dicts, + simulation_requirement=simulation_requirement, + decision_context=decision_context, + ) + + # Save private_simulation_config.json (consumed by PrivateImpactRunner) + config_path = os.path.join(_sim_dir(sim_id), "private_simulation_config.json") + with open(config_path, 'w', encoding='utf-8') as f: + json.dump(params.to_dict(), f, ensure_ascii=False, indent=2) + + # Persist metadata for subsequent endpoints + _write_meta(sim_id, { + "sim_id": sim_id, + "project_id": project_id, + "graph_id": graph_id, + "simulation_requirement": simulation_requirement, + "decision_context": decision_context, + "agent_count": len(profiles), + "created_at": datetime.now().isoformat(), + "status": "prepared", + }) + + logger.info( + f"[PRIVATE] Simulation prepared: {sim_id}, " + f"agents={len(profiles)}, graph_id={graph_id}" + ) + + return jsonify({ + "success": True, + "data": { + "sim_id": sim_id, + "agent_count": len(profiles), + "status": "prepared", + } + }) + + except Exception as e: + logger.error(f"[PRIVATE] Prepare failed: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ── POST /api/private-impact/start ──────────────────────────────────────────── + +@private_bp.route('/private-impact/start', methods=['POST']) +def start_private_simulation(): + """ + Launch a prepared Private Impact simulation. + + Request (JSON): + { + "sim_id": "private_xxxx", // Required + "max_rounds": null, // Optional + "enable_graph_memory_update": false, // Optional + "graph_id": null // Optional (required if enable_graph_memory_update) + } + + Returns: + { "success": true, "data": { "sim_id": "...", "status": "running" } } + """ + try: + data = request.get_json() or {} + + sim_id = data.get('sim_id') + if not sim_id: + return jsonify({ + "success": False, + "error": "sim_id is required" + }), 400 + + max_rounds = data.get('max_rounds') + enable_graph_memory = data.get('enable_graph_memory_update', False) + graph_id = data.get('graph_id') + + # Resolve graph_id from meta if needed + if enable_graph_memory and not graph_id: + meta = _read_meta(sim_id) + graph_id = meta.get('graph_id') + + state = PrivateImpactRunner.start_simulation( + simulation_id=sim_id, + max_rounds=max_rounds, + enable_graph_memory_update=enable_graph_memory, + graph_id=graph_id, + ) + + return jsonify({ + "success": True, + "data": { + "sim_id": sim_id, + "status": state.runner_status.value, + } + }) + + except ValueError as e: + return jsonify({ + "success": False, + "error": str(e) + }), 400 + except Exception as e: + logger.error(f"[PRIVATE] Start failed: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ── GET /api/private-impact/status/ ─────────────────────────────────── + +@private_bp.route('/private-impact/status/', methods=['GET']) +def get_private_status(sim_id: str): + """ + Return the current run state of a Private Impact simulation. + + Returns: + { "success": true, "data": PrivateSimulationRunState.to_dict() } + """ + try: + state = PrivateImpactRunner.get_status(sim_id) + + if not state: + return jsonify({ + "success": False, + "error": f"No private simulation found for sim_id: {sim_id}" + }), 404 + + return jsonify({ + "success": True, + "data": state.to_detail_dict() + }) + + except Exception as e: + logger.error(f"[PRIVATE] Get status failed: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ── POST /api/private-impact/stop/ ──────────────────────────────────── + +@private_bp.route('/private-impact/stop/', methods=['POST']) +def stop_private_simulation(sim_id: str): + """ + Stop a running Private Impact simulation. + + Returns: + { "success": true, "data": { "sim_id": "...", "status": "stopped" } } + """ + try: + state = PrivateImpactRunner.stop_simulation(sim_id) + + return jsonify({ + "success": True, + "data": { + "sim_id": sim_id, + "status": state.runner_status.value, + } + }) + + except ValueError as e: + return jsonify({ + "success": False, + "error": str(e) + }), 400 + except Exception as e: + logger.error(f"[PRIVATE] Stop failed: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ── GET /api/private-impact/actions/ ────────────────────────────────── + +@private_bp.route('/private-impact/actions/', methods=['GET']) +def get_private_actions(sim_id: str): + """ + Return the full private action log for a simulation. + + Query params: + agent_id: Filter by agent ID (optional, int) + round_num: Filter by round number (optional, int) + + Returns: + { "success": true, "data": [...], "count": N } + """ + try: + agent_id_raw = request.args.get('agent_id') + round_num_raw = request.args.get('round_num') + + agent_id = int(agent_id_raw) if agent_id_raw is not None else None + round_num = int(round_num_raw) if round_num_raw is not None else None + + actions = PrivateImpactRunner.get_all_actions( + simulation_id=sim_id, + agent_id=agent_id, + round_num=round_num, + ) + + return jsonify({ + "success": True, + "data": [a.to_dict() for a in actions], + "count": len(actions) + }) + + except Exception as e: + logger.error(f"[PRIVATE] Get actions failed: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ── POST /api/private-impact/report/ ────────────────────────────────── + +@private_bp.route('/private-impact/report/', methods=['POST']) +def generate_private_report(sim_id: str): + """ + Generate an analysis report for a Private Impact simulation. + + Reuses ReportAgent from report_agent.py with the private simulation actions. + Launches the generation in a background thread and returns a task_id immediately. + + Request (JSON): + { "force_regenerate": false } // Optional + + Returns: + { "success": true, "data": { "sim_id": "...", "report_id": "...", "task_id": "..." } } + """ + try: + data = request.get_json() or {} + force_regenerate = data.get('force_regenerate', False) + + meta = _read_meta(sim_id) + graph_id = meta.get('graph_id') + simulation_requirement = meta.get('simulation_requirement', '') + + if not graph_id: + return jsonify({ + "success": False, + "error": f"No metadata found for sim_id: {sim_id}. Run /prepare first." + }), 404 + + # Check for an existing completed report + if not force_regenerate: + existing = ReportManager.get_report_by_simulation(sim_id) + if existing and existing.status == ReportStatus.COMPLETED: + return jsonify({ + "success": True, + "data": { + "sim_id": sim_id, + "report_id": existing.report_id, + "status": "completed", + "already_generated": True, + } + }) + + # Pre-generate report_id so the frontend can track immediately + report_id = f"report_{uuid.uuid4().hex[:12]}" + + task_manager = TaskManager() + task_id = task_manager.create_task( + task_type="private_report_generate", + metadata={ + "sim_id": sim_id, + "graph_id": graph_id, + "report_id": report_id, + } + ) + + current_locale = get_locale() + + def run_generate(): + set_locale(current_locale) + try: + task_manager.update_task( + task_id, + status=TaskStatus.PROCESSING, + progress=0, + message="Initialising Report Agent for private simulation..." + ) + + agent = ReportAgent( + graph_id=graph_id, + simulation_id=sim_id, + simulation_requirement=simulation_requirement, + ) + + def progress_callback(stage, progress, message): + task_manager.update_task( + task_id, + progress=progress, + message=f"[{stage}] {message}" + ) + + report = agent.generate_report( + progress_callback=progress_callback, + report_id=report_id, + ) + ReportManager.save_report(report) + + if report.status == ReportStatus.COMPLETED: + task_manager.complete_task( + task_id, + result={ + "report_id": report.report_id, + "sim_id": sim_id, + "status": "completed", + } + ) + else: + task_manager.fail_task(task_id, report.error or "Report generation failed") + + except Exception as exc: + logger.error(f"[PRIVATE] Report generation failed: {exc}") + task_manager.fail_task(task_id, str(exc)) + + thread = threading.Thread(target=run_generate, daemon=True) + thread.start() + + return jsonify({ + "success": True, + "data": { + "sim_id": sim_id, + "report_id": report_id, + "task_id": task_id, + "status": "generating", + "already_generated": False, + } + }) + + except Exception as e: + logger.error(f"[PRIVATE] Report trigger failed: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 + + +# ── DELETE /api/private-impact/cleanup/ ─────────────────────────────── + +@private_bp.route('/private-impact/cleanup/', methods=['DELETE']) +def cleanup_private_simulation(sim_id: str): + """ + Remove Private Impact simulation artifacts to allow a fresh restart. + + Deletes run_state.json, simulation.log, private_simulation.db, private/ directory. + Does NOT delete private_simulation_config.json or profile files. + + Returns: + { "success": true, "data": { "sim_id": "...", "cleaned_files": [...] } } + """ + try: + result = PrivateImpactRunner.cleanup(sim_id) + + return jsonify({ + "success": result["success"], + "data": { + "sim_id": sim_id, + "cleaned_files": result["cleaned_files"], + "errors": result["errors"], + } + }) + + except Exception as e: + logger.error(f"[PRIVATE] Cleanup failed: {str(e)}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc() + }), 500 diff --git a/backend/app/services/private_impact_config_generator.py b/backend/app/services/private_impact_config_generator.py new file mode 100644 index 00000000..c17784cd --- /dev/null +++ b/backend/app/services/private_impact_config_generator.py @@ -0,0 +1,735 @@ +""" +Private Impact Config Generator + +Generates behavioral parameters for RelationalAgents in Private Impact mode. +Equivalent of simulation_config_generator.py for the private relational network. + +Key differences from SimulationConfigGenerator: +- No PlatformConfig (no social network) +- Time is measured in days with rounds per day (morning / noon / evening) +- RelationalActivityConfig replaces AgentActivityConfig +- PrivateEventConfig uses a decision statement instead of initial posts +- LLM calls: time config → event config → agent configs (batches of 15) +""" + +import json +import math +from dataclasses import dataclass, field, asdict +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional + +from openai import OpenAI + +from ..config import Config +from ..utils.logger import get_logger +from ..utils.locale import get_language_instruction + +logger = get_logger('mirofish.private_impact_config') + + +# ── Dataclasses ─────────────────────────────────────────────────────────────── + +@dataclass +class RelationalActivityConfig: + """ + Behavioral activity configuration for a single RelationalAgent. + + Equivalent of AgentActivityConfig for the private simulation mode. + No posting frequencies — private reactions replace social media posts. + """ + agent_id: int + entity_uuid: str + entity_name: str + relational_link_type: str # employee | manager | client | competitor | partner | familymember + + # Activity parameters + activity_level: float = 0.5 # Overall engagement level (0.0–1.0) + response_delay_min: int = 0 # Min reaction delay (days) + response_delay_max: int = 3 # Max reaction delay (days) + + # Behavioral stance toward the decision + sentiment_bias: float = 0.0 # -1.0 (hostile) → 1.0 (supportive) + stance: str = "neutral" # supportive | opposing | neutral | observer + + # Influence within the relational graph + influence_weight: float = 1.0 # Weight for cascade propagation + + # Relational graph: agent_ids this agent can expose to the decision + cascade_influence: List[int] = field(default_factory=list) + + # Simulation round at which this agent is first exposed to the decision + exposure_round: int = 0 # 0 = exposed at the very first round + + +@dataclass +class PrivateTimeConfig: + """ + Time configuration for the Private Impact simulation. + + Replaces TimeSimulationConfig (Twitter/hour-based) with a day-based, + relational-rhythm model: days × rounds per day. + """ + total_simulation_days: int = 30 # Total simulated days + rounds_per_day: int = 3 # Morning / noon / evening + reaction_delay_days_min: int = 0 # Min delay before an agent reacts + reaction_delay_days_max: int = 7 # Max delay before an agent reacts + + +@dataclass +class PrivateEventConfig: + """ + Event configuration for the Private Impact simulation. + + Replaces EventConfig (social posts) with a decision injection model. + """ + decision_statement: str = "" # The decision injected into the network + decision_maker_profile: str = "" # Short description of the decision maker + hot_topics: List[str] = field(default_factory=list) # Related sensitive topics + initial_exposed_agent_ids: List[int] = field( # Distance-1 agents (heard it first) + default_factory=list + ) + + +@dataclass +class PrivateSimulationParameters: + """Complete parameter set for a Private Impact simulation.""" + time_config: PrivateTimeConfig = field(default_factory=PrivateTimeConfig) + agent_configs: List[RelationalActivityConfig] = field(default_factory=list) + event_config: PrivateEventConfig = field(default_factory=PrivateEventConfig) + + # LLM metadata + llm_model: str = "" + llm_base_url: str = "" + generated_at: str = field(default_factory=lambda: datetime.now().isoformat()) + generation_reasoning: str = "" + + def to_dict(self) -> Dict[str, Any]: + """Serialize to plain dict.""" + return { + "time_config": asdict(self.time_config), + "agent_configs": [asdict(a) for a in self.agent_configs], + "event_config": asdict(self.event_config), + "llm_model": self.llm_model, + "llm_base_url": self.llm_base_url, + "generated_at": self.generated_at, + "generation_reasoning": self.generation_reasoning, + } + + def to_json(self, indent: int = 2) -> str: + """Serialize to JSON string.""" + return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent) + + +# ── PrivateImpactConfigGenerator ────────────────────────────────────────────── + +class PrivateImpactConfigGenerator: + """ + Generates PrivateSimulationParameters for the Private Impact simulation. + + Equivalent of SimulationConfigGenerator for the private relational mode. + Uses 3 sequential LLM calls: + 1. PrivateTimeConfig — relational rhythm (days, rounds) + 2. PrivateEventConfig — decision injection setup + 3. RelationalActivityConfig list — batches of AGENTS_PER_BATCH + + Falls back to a rule-based table per relational type on LLM failure. + """ + + AGENTS_PER_BATCH = 15 + + # Context length limits (characters) + TIME_CONFIG_CONTEXT_LENGTH = 8000 + EVENT_CONFIG_CONTEXT_LENGTH = 6000 + AGENT_SUMMARY_LENGTH = 300 + + # Rule-based fallback table by relational type + # Keys: activity_level, response_delay_min, response_delay_max, stance, influence_weight + RELATIONAL_FALLBACKS: Dict[str, Dict[str, Any]] = { + "employee": { + "activity_level": 0.6, + "response_delay_min": 0, + "response_delay_max": 3, + "sentiment_bias": 0.0, + "stance": "neutral", + "influence_weight": 0.8, + }, + "manager": { + "activity_level": 0.5, + "response_delay_min": 0, + "response_delay_max": 1, + "sentiment_bias": 0.0, + "stance": "neutral", + "influence_weight": 1.5, + }, + "client": { + "activity_level": 0.3, + "response_delay_min": 2, + "response_delay_max": 7, + "sentiment_bias": 0.0, + "stance": "observer", + "influence_weight": 1.2, + }, + "competitor": { + "activity_level": 0.2, + "response_delay_min": 1, + "response_delay_max": 5, + "sentiment_bias": -0.3, + "stance": "opposing", + "influence_weight": 1.0, + }, + "partner": { + "activity_level": 0.4, + "response_delay_min": 0, + "response_delay_max": 2, + "sentiment_bias": 0.1, + "stance": "neutral", + "influence_weight": 1.3, + }, + "familymember": { + "activity_level": 0.7, + "response_delay_min": 0, + "response_delay_max": 0, + "sentiment_bias": 0.4, + "stance": "supportive", + "influence_weight": 0.9, + }, + } + + def __init__( + self, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + model_name: Optional[str] = None, + ): + self.api_key = api_key or Config.LLM_API_KEY + self.base_url = base_url or Config.LLM_BASE_URL + self.model_name = model_name or Config.LLM_MODEL_NAME + + if not self.api_key: + raise ValueError("LLM_API_KEY is not configured") + + self.client = OpenAI( + api_key=self.api_key, + base_url=self.base_url, + ) + + # ── Public API ───────────────────────────────────────────────────────────── + + def generate_config( + self, + agent_profiles: List[Dict[str, Any]], + simulation_requirement: str, + decision_context: str = "", + progress_callback: Optional[Callable[[int, int, str], None]] = None, + ) -> PrivateSimulationParameters: + """ + Generate the complete PrivateSimulationParameters. + + Performs 3 sequential LLM call groups: + Step 1 — PrivateTimeConfig (relational rhythm) + Step 2 — PrivateEventConfig (decision injection) + Step 3-N — RelationalActivityConfig batches (AGENTS_PER_BATCH each) + + Falls back to rule-based generation per step on LLM failure. + + Args: + agent_profiles: List of agent dicts from private_impact_profile_generator. + Each dict must include: agent_id, entity_name, relational_link_type, + cascade_influence, source_entity_uuid, persona (optional). + simulation_requirement: Natural language description of the simulation goal. + decision_context: Additional context about the decision or the organization. + progress_callback: Optional callback(current_step, total_steps, message). + + Returns: + PrivateSimulationParameters ready for run_private_simulation.py. + """ + num_agents = len(agent_profiles) + num_batches = math.ceil(num_agents / self.AGENTS_PER_BATCH) + total_steps = 2 + num_batches # time + event + N agent batches + current_step = 0 + + def report(step: int, message: str) -> None: + nonlocal current_step + current_step = step + if progress_callback: + progress_callback(step, total_steps, message) + logger.info(f"[{step}/{total_steps}] {message}") + + context = self._build_context( + simulation_requirement=simulation_requirement, + decision_context=decision_context, + agent_profiles=agent_profiles, + ) + reasoning_parts: List[str] = [] + + # ── Step 1: PrivateTimeConfig ───────────────────────────────────────── + report(1, "Generating relational time configuration...") + time_result = self._generate_time_config(context, num_agents) + time_config = self._parse_time_config(time_result) + reasoning_parts.append(f"Time: {time_result.get('reasoning', 'ok')}") + + # ── Step 2: PrivateEventConfig ──────────────────────────────────────── + report(2, "Generating decision event configuration...") + event_result = self._generate_event_config(context, simulation_requirement, agent_profiles) + event_config = self._parse_event_config(event_result, agent_profiles) + reasoning_parts.append(f"Event: {event_result.get('reasoning', 'ok')}") + + # ── Steps 3-N: RelationalActivityConfig batches ─────────────────────── + all_agent_configs: List[RelationalActivityConfig] = [] + for batch_idx in range(num_batches): + start = batch_idx * self.AGENTS_PER_BATCH + end = min(start + self.AGENTS_PER_BATCH, num_agents) + batch = agent_profiles[start:end] + + report( + 3 + batch_idx, + f"Generating agent configs {start + 1}–{end} / {num_agents}...", + ) + + batch_configs = self._generate_agent_configs_batch( + context=context, + agent_profiles=batch, + start_idx=start, + simulation_requirement=simulation_requirement, + ) + all_agent_configs.extend(batch_configs) + + reasoning_parts.append(f"Agents: {len(all_agent_configs)} configured") + + return PrivateSimulationParameters( + time_config=time_config, + agent_configs=all_agent_configs, + event_config=event_config, + llm_model=self.model_name, + llm_base_url=self.base_url, + generation_reasoning=" | ".join(reasoning_parts), + ) + + # ── Context builder ──────────────────────────────────────────────────────── + + def _build_context( + self, + simulation_requirement: str, + decision_context: str, + agent_profiles: List[Dict[str, Any]], + ) -> str: + """Build the shared LLM context string for all generation steps.""" + by_type: Dict[str, int] = {} + for a in agent_profiles: + rtype = a.get("relational_link_type", "unknown") + by_type[rtype] = by_type.get(rtype, 0) + 1 + + type_summary = "\n".join( + f" - {rtype}: {count}" for rtype, count in sorted(by_type.items()) + ) + + parts = [ + f"## Simulation Requirement\n{simulation_requirement}", + f"\n## Relational Network ({len(agent_profiles)} agents)\n{type_summary}", + ] + + if decision_context: + parts.append(f"\n## Decision Context\n{decision_context[:3000]}") + + return "\n".join(parts) + + # ── Step 1: Time config ──────────────────────────────────────────────────── + + def _generate_time_config( + self, context: str, num_agents: int + ) -> Dict[str, Any]: + """Generate PrivateTimeConfig via LLM.""" + context_truncated = context[:self.TIME_CONFIG_CONTEXT_LENGTH] + + prompt = f"""Based on the following private impact simulation context, generate a relational time configuration. + +{context_truncated} + +## Task +Generate a time configuration for a private relational network simulation. +Unlike social media, this is a closed human network where decisions propagate +over days through interpersonal channels (conversations, emails, hallway talks). + +Return a JSON object (no markdown): +{{ + "total_simulation_days": , + "rounds_per_day": , + "reaction_delay_days_min": , + "reaction_delay_days_max": , + "reasoning": "" +}} + +Guidelines: +- Major organizational decisions: 30–60 days +- Personal or family decisions: 7–21 days +- Sudden crises: 7–14 days +- rounds_per_day=3 is the standard (morning / noon / evening) +- reaction_delay_days_max should not exceed total_simulation_days / 4""" + + system_prompt = ( + "You are an expert in organizational psychology and network dynamics. " + "Return pure JSON only — no markdown. " + f"{get_language_instruction()}" + ) + + try: + return self._call_llm_with_retry(prompt, system_prompt) + except Exception as e: + logger.warning(f"Time config LLM failed: {e}. Using defaults.") + return self._default_time_config() + + def _default_time_config(self) -> Dict[str, Any]: + """Rule-based default time config.""" + return { + "total_simulation_days": 30, + "rounds_per_day": 3, + "reaction_delay_days_min": 0, + "reaction_delay_days_max": 7, + "reasoning": "Default relational time config", + } + + def _parse_time_config(self, result: Dict[str, Any]) -> PrivateTimeConfig: + """Parse and validate PrivateTimeConfig from LLM result.""" + total_days = max(7, int(result.get("total_simulation_days", 30))) + rounds_per_day = max(2, min(4, int(result.get("rounds_per_day", 3)))) + delay_min = max(0, int(result.get("reaction_delay_days_min", 0))) + delay_max = max(delay_min, int(result.get("reaction_delay_days_max", 7))) + + if delay_max >= total_days: + delay_max = max(delay_min + 1, total_days // 4) + logger.warning(f"reaction_delay_days_max capped to {delay_max}") + + return PrivateTimeConfig( + total_simulation_days=total_days, + rounds_per_day=rounds_per_day, + reaction_delay_days_min=delay_min, + reaction_delay_days_max=delay_max, + ) + + # ── Step 2: Event config ─────────────────────────────────────────────────── + + def _generate_event_config( + self, + context: str, + simulation_requirement: str, + agent_profiles: List[Dict[str, Any]], + ) -> Dict[str, Any]: + """Generate PrivateEventConfig via LLM.""" + context_truncated = context[:self.EVENT_CONFIG_CONTEXT_LENGTH] + + # Build distance-1 candidate list (agents closest to the decision maker) + distance1_candidates = [ + { + "agent_id": a.get("agent_id", i), + "name": a.get("entity_name", ""), + "type": a.get("relational_link_type", ""), + } + for i, a in enumerate(agent_profiles) + if a.get("relational_link_type", "") in ( + "manager", "employee", "partner", "familymember" + ) + ][:10] + + candidates_json = json.dumps(distance1_candidates, ensure_ascii=False) + + prompt = f"""Based on the following private impact simulation context, generate the event configuration. + +{context_truncated} + +## Distance-1 agent candidates (closest to decision maker) +{candidates_json} + +## Task +Generate an event configuration for private impact injection. +Return a JSON object (no markdown): +{{ + "decision_statement": "", + "decision_maker_profile": "", + "hot_topics": ["", "", ...], + "initial_exposed_agent_ids": [, ...], + "reasoning": "" +}} + +Rules: +- decision_statement must be specific and concrete, not vague +- initial_exposed_agent_ids must only contain ids from the distance-1 candidates list above +- initial_exposed_agent_ids should be 1–3 agents (direct announcement recipients) +- hot_topics: 3–6 strings describing the sensitive dimensions of this decision + (e.g. "salary equity", "job security", "family impact", "competitive pressure")""" + + system_prompt = ( + "You are an expert in organizational decision impact simulation. " + "Return pure JSON only — no markdown. " + f"{get_language_instruction()}" + ) + + try: + return self._call_llm_with_retry(prompt, system_prompt) + except Exception as e: + logger.warning(f"Event config LLM failed: {e}. Using defaults.") + first_id = agent_profiles[0].get("agent_id", 0) if agent_profiles else 0 + return { + "decision_statement": simulation_requirement, + "decision_maker_profile": "The decision maker", + "hot_topics": ["organizational change", "impact"], + "initial_exposed_agent_ids": [first_id], + "reasoning": "Default event config (LLM fallback)", + } + + def _parse_event_config( + self, + result: Dict[str, Any], + agent_profiles: List[Dict[str, Any]], + ) -> PrivateEventConfig: + """Parse and validate PrivateEventConfig from LLM result.""" + valid_ids = {a.get("agent_id", i) for i, a in enumerate(agent_profiles)} + raw_exposed = result.get("initial_exposed_agent_ids", []) + exposed = [aid for aid in raw_exposed if aid in valid_ids] + + if not exposed and agent_profiles: + exposed = [agent_profiles[0].get("agent_id", 0)] + + return PrivateEventConfig( + decision_statement=result.get("decision_statement", ""), + decision_maker_profile=result.get("decision_maker_profile", ""), + hot_topics=result.get("hot_topics", []), + initial_exposed_agent_ids=exposed, + ) + + # ── Steps 3-N: Agent config batches ─────────────────────────────────────── + + def _generate_agent_configs_batch( + self, + context: str, + agent_profiles: List[Dict[str, Any]], + start_idx: int, + simulation_requirement: str, + ) -> List[RelationalActivityConfig]: + """ + Generate a batch of RelationalActivityConfig via LLM with rule-based fallback. + + Args: + context: Shared simulation context string. + agent_profiles: Slice of agent dicts for this batch. + start_idx: Index of the first agent in the full list (for logging). + simulation_requirement: Natural language simulation goal. + + Returns: + List of RelationalActivityConfig for this batch. + """ + agent_list = [] + summary_len = self.AGENT_SUMMARY_LENGTH + for i, a in enumerate(agent_profiles): + agent_list.append({ + "agent_id": a.get("agent_id", start_idx + i), + "entity_name": a.get("entity_name", ""), + "relational_link_type": a.get("relational_link_type", "peer"), + "cascade_influence": a.get("cascade_influence", []), + "persona_excerpt": (a.get("persona", "") or "")[:summary_len], + }) + + prompt = f"""Based on the following private impact simulation context, generate activity configurations for each relational agent. + +Simulation requirement: {simulation_requirement} + +## Agent list +```json +{json.dumps(agent_list, ensure_ascii=False, indent=2)} +``` + +## Task +Generate a RelationalActivityConfig for each agent. + +Behavioral guidelines by relational type: +- employee: activity_level=0.6, response_delay_max=3 days, stance=neutral +- manager: activity_level=0.5, response_delay_max=1 day, stance=neutral +- client: activity_level=0.3, response_delay_max=7 days, stance=observer +- competitor: activity_level=0.2, response_delay_max=5 days, stance=opposing +- partner: activity_level=0.4, response_delay_max=2 days, stance=neutral +- familymember: activity_level=0.7, response_delay_max=0 days, stance=supportive + +Return a JSON object (no markdown): +{{ + "agent_configs": [ + {{ + "agent_id": , + "activity_level": <0.0–1.0>, + "response_delay_min": , + "response_delay_max": , + "sentiment_bias": <-1.0 to 1.0>, + "stance": "", + "influence_weight": , + "exposure_round": + }}, + ... + ] +}} + +Rules: +- agent_id must match the input exactly +- stance must be one of: supportive, opposing, neutral, observer +- exposure_round = 0 for agents in initial_exposed_agent_ids, else infer from cascade distance""" + + system_prompt = ( + "You are an expert in private organizational network dynamics. " + "Return pure JSON only — no markdown. " + "IMPORTANT: The 'stance' field MUST be one of: " + "'supportive', 'opposing', 'neutral', 'observer'. " + f"{get_language_instruction()}" + ) + + try: + result = self._call_llm_with_retry(prompt, system_prompt) + llm_map: Dict[int, Dict[str, Any]] = { + cfg["agent_id"]: cfg for cfg in result.get("agent_configs", []) + } + except Exception as e: + logger.warning(f"Agent config batch LLM failed: {e}. Using rule-based fallback.") + llm_map = {} + + configs: List[RelationalActivityConfig] = [] + for i, agent in enumerate(agent_profiles): + agent_id = agent.get("agent_id", start_idx + i) + rtype = agent.get("relational_link_type", "employee").lower() + cfg = llm_map.get(agent_id) + + if not cfg: + cfg = self._agent_config_by_rule(rtype) + + configs.append(RelationalActivityConfig( + agent_id=agent_id, + entity_uuid=agent.get("source_entity_uuid", ""), + entity_name=agent.get("entity_name", ""), + relational_link_type=rtype, + activity_level=float(cfg.get("activity_level", 0.5)), + response_delay_min=int(cfg.get("response_delay_min", 0)), + response_delay_max=int(cfg.get("response_delay_max", 3)), + sentiment_bias=float(cfg.get("sentiment_bias", 0.0)), + stance=cfg.get("stance", "neutral"), + influence_weight=float(cfg.get("influence_weight", 1.0)), + cascade_influence=agent.get("cascade_influence", []), + exposure_round=int(cfg.get("exposure_round", 0)), + )) + + return configs + + def _agent_config_by_rule(self, relational_type: str) -> Dict[str, Any]: + """ + Return rule-based activity config for a given relational type. + + Falls back to 'employee' defaults for unknown types. + + Args: + relational_type: Lowercase relational type string. + + Returns: + Dict with activity_level, response_delay_min/max, sentiment_bias, + stance, influence_weight. + """ + return dict(self.RELATIONAL_FALLBACKS.get( + relational_type, self.RELATIONAL_FALLBACKS["employee"] + )) + + # ── LLM helpers ─────────────────────────────────────────────────────────── + + def _call_llm_with_retry(self, prompt: str, system_prompt: str) -> Dict[str, Any]: + """ + Call the LLM with up to 3 retry attempts. + + Mirrors SimulationConfigGenerator._call_llm_with_retry with: + - JSON response format enforced + - Temperature annealing on each retry + - Truncation repair via _fix_truncated_json + + Args: + prompt: User prompt. + system_prompt: System instructions. + + Returns: + Parsed JSON dict. + + Raises: + Exception: If all attempts fail. + """ + import time + + max_attempts = 3 + last_error: Optional[Exception] = None + + for attempt in range(max_attempts): + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt}, + ], + response_format={"type": "json_object"}, + temperature=0.7 - (attempt * 0.1), + ) + content = response.choices[0].message.content + finish_reason = response.choices[0].finish_reason + + if finish_reason == "length": + logger.warning( + f"LLM output truncated (attempt {attempt + 1}), attempting repair..." + ) + content = self._fix_truncated_json(content) + + try: + return json.loads(content) + except json.JSONDecodeError as e: + logger.warning(f"JSON parse failed (attempt {attempt + 1}): {str(e)[:80]}") + fixed = self._try_fix_config_json(content) + if fixed: + return fixed + last_error = e + + except Exception as e: + logger.warning(f"LLM call failed (attempt {attempt + 1}): {str(e)[:80]}") + last_error = e + time.sleep(2 * (attempt + 1)) + + raise last_error or Exception("LLM call failed after all retries") + + def _fix_truncated_json(self, content: str) -> str: + """Repair truncated JSON by closing unclosed brackets and strings.""" + content = content.strip() + open_braces = content.count("{") - content.count("}") + open_brackets = content.count("[") - content.count("]") + + if content and content[-1] not in '",}]': + content += '"' + + content += "]" * open_brackets + content += "}" * open_braces + return content + + def _try_fix_config_json(self, content: str) -> Optional[Dict[str, Any]]: + """Attempt to extract and repair a JSON object from malformed LLM output.""" + import re + + content = self._fix_truncated_json(content) + json_match = re.search(r"\{[\s\S]*\}", content) + if not json_match: + return None + + json_str = json_match.group() + + def fix_string(match: re.Match) -> str: + s = match.group(0) + s = s.replace("\n", " ").replace("\r", " ") + s = re.sub(r"\s+", " ", s) + return s + + json_str = re.sub(r'"[^"\\]*(?:\\.[^"\\]*)*"', fix_string, json_str) + + try: + return json.loads(json_str) + except Exception: + json_str = re.sub(r"[\x00-\x1f\x7f-\x9f]", " ", json_str) + json_str = re.sub(r"\s+", " ", json_str) + try: + return json.loads(json_str) + except Exception: + return None diff --git a/backend/app/services/private_impact_profile_generator.py b/backend/app/services/private_impact_profile_generator.py new file mode 100644 index 00000000..908ac227 --- /dev/null +++ b/backend/app/services/private_impact_profile_generator.py @@ -0,0 +1,823 @@ +""" +Private Impact Profile Generator + +Converts Zep graph entities into RelationalAgentProfile instances for the +Private Impact simulation mode. + +Extends OasisProfileGenerator with relational dimensions: +- Relationship type with the decision maker (hierarchical, client, peer, ...) +- Trust level, financial sensitivity, equity tolerance, institutional loyalty +- Natural reaction mode (internalize, confront, silent_leave, coalition_build) +- Cascade influence graph (which agents this agent can expose) + +Key design principle (from IDEE-FORK-MIROFISH.md §4): + The `persona` field is the sole behavioral vector injected into the LLM + system prompt each round. All relational dimensions are encoded as natural + language inside `persona` — no engine modification required. +""" + +import json +import random +import time +from dataclasses import dataclass, field +from datetime import datetime +from threading import Lock +from typing import Any, Dict, List, Optional + +import concurrent.futures + +from openai import OpenAI +from zep_cloud.client import Zep + +from ..config import Config +from ..utils.logger import get_logger +from ..utils.locale import get_language_instruction, get_locale, set_locale +from .oasis_profile_generator import OasisAgentProfile, OasisProfileGenerator +from .zep_entity_reader import EntityNode + +logger = get_logger('mirofish.private_impact_profile') + + +# ── RelationalAgentProfile ──────────────────────────────────────────────────── + +@dataclass +class RelationalAgentProfile(OasisAgentProfile): + """ + Extended OASIS Agent Profile with relational network dimensions. + + All relational fields are encoded into the `persona` text field via + _encode_relational_persona() before being stored. The inherited `persona` + is what gets injected into the LLM system prompt each simulation round. + """ + + # Relationship with the decision maker + relational_link_type: str = "peer" # hierarchical | client | peer | family | competitor + relational_seniority_years: int = 0 + relational_trust_level: float = 0.5 # 0.0 → 1.0 + + # Psycho-social dimensions + financial_sensitivity: float = 0.5 # Sensitivity to wealth signals + equity_tolerance: float = 0.5 # Tolerance for status disparities + institutional_loyalty: float = 0.5 # Loyalty to the org vs the person + + # Natural reaction mode when facing a triggering decision + private_reaction_mode: str = "internalize" # internalize | confront | silent_leave | coalition_build + + # Cascade influence graph: agent_ids this agent can expose + cascade_influence: List[int] = field(default_factory=list) + + def to_private_format(self) -> Dict[str, Any]: + """ + Serialize to the format expected by run_private_simulation.py. + + The simulation engine reads agent_configs as plain dicts, accessing: + agent_id, entity_name, persona, cascade_influence, active_hours, + activity_level. + """ + return { + "agent_id": self.user_id, + "entity_name": self.name, + "user_name": self.user_name, + "bio": self.bio, + "persona": self.persona, # Encoded with relational context + "cascade_influence": self.cascade_influence, + "relational_link_type": self.relational_link_type, + "relational_seniority_years": self.relational_seniority_years, + "relational_trust_level": self.relational_trust_level, + "financial_sensitivity": self.financial_sensitivity, + "equity_tolerance": self.equity_tolerance, + "institutional_loyalty": self.institutional_loyalty, + "private_reaction_mode": self.private_reaction_mode, + "age": self.age, + "gender": self.gender, + "mbti": self.mbti, + "country": self.country, + "profession": self.profession, + "source_entity_uuid": self.source_entity_uuid, + "source_entity_type": self.source_entity_type, + "created_at": self.created_at, + } + + def to_dict(self) -> Dict[str, Any]: + """Full dict representation including relational fields.""" + base = super().to_dict() + base.update({ + "relational_link_type": self.relational_link_type, + "relational_seniority_years": self.relational_seniority_years, + "relational_trust_level": self.relational_trust_level, + "financial_sensitivity": self.financial_sensitivity, + "equity_tolerance": self.equity_tolerance, + "institutional_loyalty": self.institutional_loyalty, + "private_reaction_mode": self.private_reaction_mode, + "cascade_influence": self.cascade_influence, + }) + return base + + +# ── PrivateImpactProfileGenerator ──────────────────────────────────────────── + +class PrivateImpactProfileGenerator(OasisProfileGenerator): + """ + Generates RelationalAgentProfile instances for the Private Impact simulation. + + Extends OasisProfileGenerator with: + - Relational entity types (Employee, Manager, Client, ...) + - LLM prompt enriched with relational dimensions + - Relational rule-based fallback by entity type + - persona encoding that injects relational context as natural language + + Pipeline (same as OasisProfileGenerator): + EntityNode (Zep) → _build_entity_context() → LLM / rule-based + → RelationalAgentProfile (relational fields encoded into persona) + """ + + # Relational entity types — map to default behavioral parameters + RELATIONAL_ENTITY_TYPES = [ + "employee", "manager", "client", "competitor", + "partner", "familymember", "colleague", "investor", + ] + + # Default behavioral parameters by relational type + # (trust_level, financial_sensitivity, equity_tolerance, + # institutional_loyalty, reaction_mode, activity_level, active_hours) + RELATIONAL_DEFAULTS: Dict[str, Dict[str, Any]] = { + "employee": { + "trust_level": 0.6, + "financial_sensitivity": 0.7, + "equity_tolerance": 0.4, + "institutional_loyalty": 0.6, + "reaction_mode": "internalize", + "activity_level": 0.6, + "active_hours": list(range(8, 19)), + "influence_weight": 0.8, + }, + "manager": { + "trust_level": 0.7, + "financial_sensitivity": 0.5, + "equity_tolerance": 0.5, + "institutional_loyalty": 0.7, + "reaction_mode": "confront", + "activity_level": 0.5, + "active_hours": list(range(8, 20)), + "influence_weight": 1.5, + }, + "client": { + "trust_level": 0.4, + "financial_sensitivity": 0.3, + "equity_tolerance": 0.6, + "institutional_loyalty": 0.3, + "reaction_mode": "silent_leave", + "activity_level": 0.3, + "active_hours": list(range(9, 13)) + list(range(17, 21)), + "influence_weight": 1.2, + }, + "competitor": { + "trust_level": 0.2, + "financial_sensitivity": 0.4, + "equity_tolerance": 0.7, + "institutional_loyalty": 0.1, + "reaction_mode": "coalition_build", + "activity_level": 0.2, + "active_hours": list(range(9, 19)), + "influence_weight": 1.0, + }, + "partner": { + "trust_level": 0.6, + "financial_sensitivity": 0.4, + "equity_tolerance": 0.6, + "institutional_loyalty": 0.5, + "reaction_mode": "internalize", + "activity_level": 0.4, + "active_hours": list(range(9, 18)), + "influence_weight": 1.3, + }, + "familymember": { + "trust_level": 0.8, + "financial_sensitivity": 0.8, + "equity_tolerance": 0.5, + "institutional_loyalty": 0.2, + "reaction_mode": "confront", + "activity_level": 0.7, + "active_hours": list(range(7, 10)) + list(range(18, 24)), + "influence_weight": 0.9, + }, + } + + # Reaction mode descriptions for LLM prompt injection + REACTION_MODE_DESCRIPTIONS: Dict[str, str] = { + "internalize": "processes the news internally without immediate visible action; absorbs tension before potentially acting later", + "confront": "tends to address the issue head-on, speaking directly to the decision maker or raising concerns openly", + "silent_leave": "quietly disengages — reduces commitment, starts looking for alternatives, without announcing it", + "coalition_build": "looks for allies among peers before taking any visible action; builds shared narratives", + } + + def generate_profile_from_entity( + self, + entity: EntityNode, + user_id: int, + use_llm: bool = True, + cascade_influence: Optional[List[int]] = None, + ) -> RelationalAgentProfile: + """ + Generate a RelationalAgentProfile from a Zep entity node. + + Divergence from OasisProfileGenerator.generate_profile_from_entity: + Returns RelationalAgentProfile instead of OasisAgentProfile. + Relational dimensions are encoded into the persona text field. + + Args: + entity: Zep entity node. + user_id: Agent ID in the simulation. + use_llm: Whether to use LLM for profile generation. + cascade_influence: List of agent_ids this agent can expose (optional). + + Returns: + RelationalAgentProfile with relational context encoded in persona. + """ + entity_type = entity.get_entity_type() or "peer" + name = entity.name + user_name = self._generate_username(name) + context = self._build_entity_context(entity) + + if use_llm: + profile_data = self._generate_relational_profile_with_llm( + entity_name=name, + entity_type=entity_type, + entity_summary=entity.summary, + entity_attributes=entity.attributes, + context=context, + ) + else: + profile_data = self._generate_relational_profile_rule_based( + entity_name=name, + entity_type=entity_type, + entity_summary=entity.summary, + ) + + # Extract relational dimensions from LLM output + relational_link_type = profile_data.get("relational_link_type", "peer") + seniority_years = int(profile_data.get("relational_seniority_years", 0)) + trust_level = float(profile_data.get("relational_trust_level", 0.5)) + financial_sensitivity = float(profile_data.get("financial_sensitivity", 0.5)) + equity_tolerance = float(profile_data.get("equity_tolerance", 0.5)) + institutional_loyalty = float(profile_data.get("institutional_loyalty", 0.5)) + reaction_mode = profile_data.get("private_reaction_mode", "internalize") + + # Clamp floats to [0.0, 1.0] + trust_level = max(0.0, min(1.0, trust_level)) + financial_sensitivity = max(0.0, min(1.0, financial_sensitivity)) + equity_tolerance = max(0.0, min(1.0, equity_tolerance)) + institutional_loyalty = max(0.0, min(1.0, institutional_loyalty)) + + # Encode relational context into the persona text + base_persona = profile_data.get( + "persona", entity.summary or f"A {entity_type} named {name}." + ) + enriched_persona = self._encode_relational_persona( + base_persona=base_persona, + name=name, + relational_link_type=relational_link_type, + seniority_years=seniority_years, + trust_level=trust_level, + financial_sensitivity=financial_sensitivity, + equity_tolerance=equity_tolerance, + institutional_loyalty=institutional_loyalty, + reaction_mode=reaction_mode, + ) + + return RelationalAgentProfile( + user_id=user_id, + user_name=user_name, + name=name, + bio=profile_data.get("bio", f"{entity_type}: {name}"), + persona=enriched_persona, + karma=profile_data.get("karma", random.randint(500, 3000)), + friend_count=profile_data.get("friend_count", random.randint(20, 300)), + follower_count=profile_data.get("follower_count", random.randint(30, 500)), + statuses_count=profile_data.get("statuses_count", random.randint(50, 1000)), + age=profile_data.get("age"), + gender=profile_data.get("gender"), + mbti=profile_data.get("mbti"), + country=profile_data.get("country"), + profession=profile_data.get("profession"), + interested_topics=profile_data.get("interested_topics", []), + source_entity_uuid=entity.uuid, + source_entity_type=entity_type, + relational_link_type=relational_link_type, + relational_seniority_years=seniority_years, + relational_trust_level=trust_level, + financial_sensitivity=financial_sensitivity, + equity_tolerance=equity_tolerance, + institutional_loyalty=institutional_loyalty, + private_reaction_mode=reaction_mode, + cascade_influence=cascade_influence or [], + ) + + # ── Persona encoding ────────────────────────────────────────────────────── + + def _encode_relational_persona( + self, + base_persona: str, + name: str, + relational_link_type: str, + seniority_years: int, + trust_level: float, + financial_sensitivity: float, + equity_tolerance: float, + institutional_loyalty: float, + reaction_mode: str, + ) -> str: + """ + Encode relational dimensions into natural language appended to persona. + + This is the central mechanism: OASIS (and our private simulation) inject + the persona field as-is into the LLM system prompt. By appending a + structured relational context block, we guide agent behavior without + modifying the simulation engine. + + Args: + base_persona: Base persona text from LLM or rule-based fallback. + name: Agent name. + relational_link_type: Type of relationship with the decision maker. + seniority_years: Years in this relational context. + trust_level: Trust level with decision maker (0–1). + financial_sensitivity: Sensitivity to wealth signals (0–1). + equity_tolerance: Tolerance for status disparities (0–1). + institutional_loyalty: Loyalty to the org vs the person (0–1). + reaction_mode: Natural reaction pattern. + + Returns: + Enriched persona string with relational context block appended. + """ + # Trust descriptor + if trust_level >= 0.75: + trust_desc = "very high" + elif trust_level >= 0.5: + trust_desc = "moderate" + elif trust_level >= 0.25: + trust_desc = "low" + else: + trust_desc = "very low" + + # Financial sensitivity descriptor + if financial_sensitivity >= 0.75: + fin_desc = "highly sensitive to wealth signals and perceived inequity" + elif financial_sensitivity >= 0.5: + fin_desc = "moderately sensitive to financial signals" + else: + fin_desc = "relatively indifferent to wealth signals" + + # Equity tolerance descriptor + if equity_tolerance <= 0.25: + eq_desc = "very low tolerance for status disparities — notices and resents inequalities" + elif equity_tolerance <= 0.5: + eq_desc = "moderate discomfort with status disparities" + else: + eq_desc = "accepts status differences as normal" + + reaction_desc = self.REACTION_MODE_DESCRIPTIONS.get( + reaction_mode, + "processes the situation and responds according to their character" + ) + + seniority_str = ( + f"{seniority_years} year{'s' if seniority_years != 1 else ''}" + if seniority_years > 0 else "recent" + ) + + loyalty_desc = ( + "strongly attached to the organization and its continuity" + if institutional_loyalty >= 0.7 + else "balanced between personal interests and organizational ones" + if institutional_loyalty >= 0.4 + else "primarily driven by personal interests over institutional ones" + ) + + relational_block = ( + f"\n\n--- Relational Context (Private Impact Simulation) ---\n" + f"Your name is {name}.\n" + f"Your relationship with the decision maker: {relational_link_type} " + f"({seniority_str} of shared history).\n" + f"Trust level with the decision maker: {trust_desc} ({trust_level:.1f}/1.0).\n" + f"Financial sensitivity: {fin_desc} (score: {financial_sensitivity:.1f}).\n" + f"Equity tolerance: {eq_desc} (score: {equity_tolerance:.1f}).\n" + f"Institutional loyalty: {loyalty_desc} (score: {institutional_loyalty:.1f}).\n" + f"Your natural reaction mode: {reaction_mode} — you {reaction_desc}.\n" + f"--- End Relational Context ---" + ) + + return base_persona + relational_block + + # ── LLM profile generation ──────────────────────────────────────────────── + + def _generate_relational_profile_with_llm( + self, + entity_name: str, + entity_type: str, + entity_summary: str, + entity_attributes: Dict[str, Any], + context: str, + ) -> Dict[str, Any]: + """ + Generate relational profile via LLM. + + Divergence from OasisProfileGenerator._generate_profile_with_llm: + Adds relational dimension fields to the JSON output schema. + Falls back to rule-based generation on failure (same pattern as parent). + + Args: + entity_name: Entity name. + entity_type: Entity type from Zep. + entity_summary: Entity summary from Zep. + entity_attributes: Entity attributes dict. + context: Enriched context from _build_entity_context(). + + Returns: + Profile data dict including relational dimensions. + """ + prompt = self._build_relational_persona_prompt( + entity_name=entity_name, + entity_type=entity_type, + entity_summary=entity_summary, + entity_attributes=entity_attributes, + context=context, + ) + system_prompt = ( + "You are an expert in organizational psychology and behavioral simulation. " + "Generate realistic relational agent profiles for private impact simulations. " + "Return valid JSON only — no markdown, no prose outside the JSON object. " + f"{get_language_instruction()}" + ) + + max_attempts = 3 + last_error = None + + for attempt in range(max_attempts): + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt}, + ], + response_format={"type": "json_object"}, + temperature=0.7 - (attempt * 0.1), + ) + + content = response.choices[0].message.content + finish_reason = response.choices[0].finish_reason + + if finish_reason == 'length': + logger.warning(f"LLM output truncated (attempt {attempt + 1}), attempting fix...") + content = self._fix_truncated_json(content) + + try: + result = json.loads(content) + + # Ensure required fields + if not result.get("bio"): + result["bio"] = entity_summary[:200] if entity_summary else f"{entity_type}: {entity_name}" + if not result.get("persona"): + result["persona"] = entity_summary or f"{entity_name} is a {entity_type}." + + return result + + except json.JSONDecodeError as je: + logger.warning(f"JSON parse failed (attempt {attempt + 1}): {str(je)[:80]}") + result = self._try_fix_json(content, entity_name, entity_type, entity_summary) + if result.get("_fixed"): + del result["_fixed"] + return result + last_error = je + + except Exception as e: + logger.warning(f"LLM call failed (attempt {attempt + 1}): {str(e)[:80]}") + last_error = e + time.sleep(1 * (attempt + 1)) + + logger.warning( + f"LLM profile generation failed after {max_attempts} attempts: {last_error}. " + f"Falling back to rule-based." + ) + return self._generate_relational_profile_rule_based( + entity_name=entity_name, + entity_type=entity_type, + entity_summary=entity_summary, + ) + + def _build_relational_persona_prompt( + self, + entity_name: str, + entity_type: str, + entity_summary: str, + entity_attributes: Dict[str, Any], + context: str, + ) -> str: + """ + Build the LLM prompt for relational profile generation. + + Divergence from parent _build_individual_persona_prompt: + Adds relational dimension fields to the JSON schema. + """ + attrs_str = json.dumps(entity_attributes, ensure_ascii=False) if entity_attributes else "none" + context_str = context[:3000] if context else "No additional context." + + return f"""Generate a relational agent profile for a private impact simulation. + +Entity name: {entity_name} +Entity type: {entity_type} +Entity summary: {entity_summary} +Entity attributes: {attrs_str} + +Context: +{context_str} + +Return a JSON object with these fields: + +1. bio: Short profile description (max 200 characters) +2. persona: Detailed behavioral description (plain text, no line breaks inside the string, ~500 words): + - Background, personality, professional history + - Emotional patterns and communication style + - Relationship with authority and institutions + - Known reactions to organizational decisions +3. age: Integer (or null) +4. gender: "male", "female", or "other" +5. mbti: MBTI type (e.g. "INTJ") or null +6. country: Country name +7. profession: Current role or function +8. interested_topics: Array of relevant topics + +Relational dimensions (required): +9. relational_link_type: One of "hierarchical", "client", "peer", "family", "competitor" +10. relational_seniority_years: Integer (years in this relational context) +11. relational_trust_level: Float 0.0–1.0 (trust in decision maker) +12. financial_sensitivity: Float 0.0–1.0 (sensitivity to wealth signals) +13. equity_tolerance: Float 0.0–1.0 (tolerance for status disparities) +14. institutional_loyalty: Float 0.0–1.0 (loyalty to org vs personal interests) +15. private_reaction_mode: One of "internalize", "confront", "silent_leave", "coalition_build" + +Rules: +- All string values must not contain literal line breaks +- persona must be a single continuous text paragraph +- Float values must be between 0.0 and 1.0 +- Infer relational dimensions from entity type and context when possible +""" + + # ── Rule-based fallback ─────────────────────────────────────────────────── + + def _generate_relational_profile_rule_based( + self, + entity_name: str, + entity_type: str, + entity_summary: str, + ) -> Dict[str, Any]: + """ + Generate relational profile using predefined defaults by entity type. + + Divergence from OasisProfileGenerator._generate_profile_rule_based: + Uses RELATIONAL_DEFAULTS table instead of social media entity types. + Covers: Employee, Manager, Client, Competitor, Partner, FamilyMember. + + Args: + entity_name: Entity name. + entity_type: Relational entity type. + entity_summary: Entity summary for persona fallback. + + Returns: + Profile data dict with relational dimensions set from defaults. + """ + type_key = entity_type.lower() + defaults = self.RELATIONAL_DEFAULTS.get(type_key, self.RELATIONAL_DEFAULTS["employee"]) + + base_persona = ( + entity_summary + or f"{entity_name} is a {entity_type} connected to the decision maker's network." + ) + + return { + "bio": ( + entity_summary[:150] + if entity_summary + else f"{entity_type}: {entity_name}" + ), + "persona": base_persona, + "age": random.randint(25, 55), + "gender": random.choice(["male", "female"]), + "mbti": random.choice(self.MBTI_TYPES), + "country": random.choice(self.COUNTRIES), + "profession": entity_type.capitalize(), + "interested_topics": ["Professional Development", "Organizational Dynamics"], + # Relational dimensions from defaults table + "relational_link_type": type_key if type_key in ( + "hierarchical", "client", "peer", "family", "competitor" + ) else "peer", + "relational_seniority_years": random.randint(1, 8), + "relational_trust_level": defaults["trust_level"], + "financial_sensitivity": defaults["financial_sensitivity"], + "equity_tolerance": defaults.get("equity_tolerance", 0.5), + "institutional_loyalty": defaults.get("institutional_loyalty", 0.5), + "private_reaction_mode": defaults["reaction_mode"], + } + + # ── Batch generation ────────────────────────────────────────────────────── + + def generate_profiles_from_entities( + self, + entities: List[EntityNode], + use_llm: bool = True, + progress_callback: Optional[callable] = None, + graph_id: Optional[str] = None, + parallel_count: int = 5, + realtime_output_path: Optional[str] = None, + cascade_influence_map: Optional[Dict[int, List[int]]] = None, + **kwargs, # absorb unused parent kwargs (output_platform, etc.) + ) -> List[RelationalAgentProfile]: + """ + Generate RelationalAgentProfile instances for all entities in parallel. + + Divergence from OasisProfileGenerator.generate_profiles_from_entities: + Returns RelationalAgentProfile instances. + Accepts cascade_influence_map to assign relational graph edges per agent. + + Args: + entities: List of Zep entity nodes. + use_llm: Whether to use LLM generation (falls back to rule-based). + progress_callback: Optional callback(current, total, message). + graph_id: Zep graph ID for context enrichment. + parallel_count: Number of concurrent generation threads. + realtime_output_path: Path to write profiles as they are generated. + cascade_influence_map: {agent_index: [influenced_agent_ids]}. + + Returns: + List of RelationalAgentProfile instances. + """ + if graph_id: + self.graph_id = graph_id + + cascade_influence_map = cascade_influence_map or {} + total = len(entities) + profiles: List[Optional[RelationalAgentProfile]] = [None] * total + completed_count = [0] + lock = Lock() + + def save_realtime() -> None: + """Write generated profiles to file as they complete.""" + if not realtime_output_path: + return + with lock: + existing = [p for p in profiles if p is not None] + if not existing: + return + try: + data = [p.to_private_format() for p in existing] + with open(realtime_output_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + except Exception as e: + logger.warning(f"Realtime save failed: {e}") + + current_locale = get_locale() + + def generate_single(idx: int, entity: EntityNode) -> tuple: + set_locale(current_locale) + entity_type = entity.get_entity_type() or "peer" + cascade = cascade_influence_map.get(idx, []) + + try: + profile = self.generate_profile_from_entity( + entity=entity, + user_id=idx, + use_llm=use_llm, + cascade_influence=cascade, + ) + self._print_generated_relational_profile(entity.name, entity_type, profile) + return idx, profile, None + + except Exception as e: + logger.error(f"Profile generation failed for {entity.name}: {e}") + fallback = RelationalAgentProfile( + user_id=idx, + user_name=self._generate_username(entity.name), + name=entity.name, + bio=f"{entity_type}: {entity.name}", + persona=( + entity.summary + or f"{entity.name} is a {entity_type} in the relational network." + ), + source_entity_uuid=entity.uuid, + source_entity_type=entity_type, + cascade_influence=cascade, + ) + return idx, fallback, str(e) + + logger.info( + f"Starting parallel profile generation — {total} entities, " + f"parallel_count={parallel_count}" + ) + print(f"\n{'='*60}") + print(f"Private Impact — Generating {total} relational profiles (parallel: {parallel_count})") + print(f"{'='*60}\n") + + with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_count) as executor: + future_map = { + executor.submit(generate_single, idx, entity): (idx, entity) + for idx, entity in enumerate(entities) + } + + for future in concurrent.futures.as_completed(future_map): + idx, entity = future_map[future] + entity_type = entity.get_entity_type() or "peer" + + try: + result_idx, profile, error = future.result() + profiles[result_idx] = profile + + with lock: + completed_count[0] += 1 + current = completed_count[0] + + save_realtime() + + if progress_callback: + progress_callback( + current, + total, + f"Done {current}/{total}: {entity.name} ({entity_type})" + ) + + if error: + logger.warning(f"[{current}/{total}] {entity.name} using fallback: {error}") + else: + logger.info(f"[{current}/{total}] Generated: {entity.name} ({entity_type})") + + except Exception as e: + logger.error(f"Error processing {entity.name}: {e}") + with lock: + completed_count[0] += 1 + + profiles[idx] = RelationalAgentProfile( + user_id=idx, + user_name=self._generate_username(entity.name), + name=entity.name, + bio=f"{entity_type}: {entity.name}", + persona=entity.summary or "A participant in the relational network.", + source_entity_uuid=entity.uuid, + source_entity_type=entity_type, + ) + save_realtime() + + valid_count = len([p for p in profiles if p is not None]) + print(f"\n{'='*60}") + print(f"Profile generation complete — {valid_count} relational agents ready") + print(f"{'='*60}\n") + + return [p for p in profiles if p is not None] + + def save_profiles( + self, + profiles: List[RelationalAgentProfile], + file_path: str, + platform: str = "private", + ) -> None: + """ + Save relational profiles to JSON. + + Divergence from OasisProfileGenerator.save_profiles: + Always uses to_private_format() — no CSV output, no Reddit/Twitter format. + The output is a JSON array of agent config dicts consumed by + run_private_simulation.py. + + Args: + profiles: List of RelationalAgentProfile instances. + file_path: Output path (.json). + platform: Ignored — always uses private format. + """ + data = [p.to_private_format() for p in profiles] + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + logger.info(f"Saved {len(profiles)} relational profiles to {file_path}") + + # ── Console output ──────────────────────────────────────────────────────── + + def _print_generated_relational_profile( + self, + entity_name: str, + entity_type: str, + profile: RelationalAgentProfile, + ) -> None: + """Print a summary of the generated relational profile to stdout.""" + separator = "-" * 70 + lines = [ + f"\n{separator}", + f"[Private Impact] Profile generated: {entity_name} ({entity_type})", + separator, + f"Name: {profile.name} | Link: {profile.relational_link_type} " + f"| Reaction: {profile.private_reaction_mode}", + f"Trust: {profile.relational_trust_level:.2f} " + f"| Fin.Sensitivity: {profile.financial_sensitivity:.2f} " + f"| Loyalty: {profile.institutional_loyalty:.2f}", + f"Cascade influence: {profile.cascade_influence}", + f"", + f"[Bio] {profile.bio}", + separator, + ] + print("\n".join(lines)) diff --git a/backend/app/services/private_impact_runner.py b/backend/app/services/private_impact_runner.py new file mode 100644 index 00000000..50f9035d --- /dev/null +++ b/backend/app/services/private_impact_runner.py @@ -0,0 +1,903 @@ +""" +Private Impact Runner + +Orchestrates the Private Impact simulation via subprocess, monitors +private/actions.jsonl for real-time state updates, and exposes the +interface used by the Flask /api/private-impact blueprint. + +Equivalent of simulation_runner.py for the Private Impact mode. + +Key differences from SimulationRunner: +- Single platform: "private" (no Twitter/Reddit split) +- Action log: {sim_dir}/private/actions.jsonl +- Config file: private_simulation_config.json +- Script: backend/scripts/run_private_simulation.py +- Time unit: simulated days (not hours) +- Cleanup removes private_simulation.db + private/ directory + +Note on SimulationLogManager.get_private_logger(): + SimulationLogManager (backend/scripts/action_logger.py) does NOT currently + expose get_private_logger(). run_private_simulation.py falls back directly + to PlatformActionLogger("private", simulation_dir). This method must be added + to action_logger.py in a future prompt — see CONTEXT.md. +""" + +import json +import os +import shutil +import signal +import subprocess +import sys +import threading +import time +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + +from ..utils.logger import get_logger +from ..utils.locale import get_locale, set_locale +from .zep_graph_memory_updater import ZepGraphMemoryManager + +logger = get_logger('mirofish.private_impact_runner') + +IS_WINDOWS = sys.platform == 'win32' + + +# ── Enums ───────────────────────────────────────────────────────────────────── + +class PrivateRunnerStatus(str, Enum): + """Run state of the Private Impact simulation subprocess.""" + IDLE = "idle" + STARTING = "starting" + RUNNING = "running" + STOPPING = "stopping" + STOPPED = "stopped" + COMPLETED = "completed" + FAILED = "failed" + + +# ── Dataclasses ─────────────────────────────────────────────────────────────── + +@dataclass +class PrivateAgentAction: + """ + Single relational action record parsed from private/actions.jsonl. + + Equivalent of AgentAction for the private simulation mode. + No platform split — all actions are platform="private". + """ + round_num: int + timestamp: str + agent_id: int + agent_name: str + action_type: str # REACT_PRIVATELY | CONFRONT | COALITION_BUILD | + # SILENT_LEAVE | VOCAL_SUPPORT | DO_NOTHING + action_args: Dict[str, Any] = field(default_factory=dict) + result: Optional[str] = None + success: bool = True + + def to_dict(self) -> Dict[str, Any]: + return { + "round_num": self.round_num, + "timestamp": self.timestamp, + "platform": "private", + "agent_id": self.agent_id, + "agent_name": self.agent_name, + "action_type": self.action_type, + "action_args": self.action_args, + "result": self.result, + "success": self.success, + } + + +@dataclass +class PrivateSimulationRunState: + """ + Real-time run state for a Private Impact simulation. + + Equivalent of SimulationRunState for the private mode. + Uses private_* field names — no twitter_* / reddit_* split. + """ + simulation_id: str + runner_status: PrivateRunnerStatus = PrivateRunnerStatus.IDLE + + # Progress + private_current_round: int = 0 + private_total_rounds: int = 0 + private_simulated_days: int = 0 + private_total_days: int = 0 + + # Platform state (single: private) + private_running: bool = False + private_actions_count: int = 0 + private_completed: bool = False + + # Error + private_error: Optional[str] = None + + # Recent actions for frontend live display + recent_actions: List[PrivateAgentAction] = field(default_factory=list) + max_recent_actions: int = 50 + + # Timestamps + started_at: Optional[str] = None + updated_at: str = field(default_factory=lambda: datetime.now().isoformat()) + completed_at: Optional[str] = None + + # Subprocess PID + process_pid: Optional[int] = None + + def add_action(self, action: PrivateAgentAction) -> None: + """Prepend action to recent_actions and increment actions counter.""" + self.recent_actions.insert(0, action) + if len(self.recent_actions) > self.max_recent_actions: + self.recent_actions = self.recent_actions[:self.max_recent_actions] + self.private_actions_count += 1 + self.updated_at = datetime.now().isoformat() + + def to_dict(self) -> Dict[str, Any]: + total = max(self.private_total_rounds, 1) + return { + "simulation_id": self.simulation_id, + "runner_status": self.runner_status.value, + "private_current_round": self.private_current_round, + "private_total_rounds": self.private_total_rounds, + "private_simulated_days": self.private_simulated_days, + "private_total_days": self.private_total_days, + "progress_percent": round(self.private_current_round / total * 100, 1), + "private_running": self.private_running, + "private_actions_count": self.private_actions_count, + "private_completed": self.private_completed, + "private_error": self.private_error, + "started_at": self.started_at, + "updated_at": self.updated_at, + "completed_at": self.completed_at, + "process_pid": self.process_pid, + } + + def to_detail_dict(self) -> Dict[str, Any]: + """Extended dict including recent actions.""" + result = self.to_dict() + result["recent_actions"] = [a.to_dict() for a in self.recent_actions] + return result + + +# ── PrivateImpactRunner ──────────────────────────────────────────────────────── + +class PrivateImpactRunner: + """ + Orchestrates Private Impact simulations. + + Equivalent of SimulationRunner for the private relational mode. + Launches run_private_simulation.py as a subprocess, monitors + private/actions.jsonl for state updates, and exposes the interface + consumed by the Flask /api/private-impact blueprint. + + Directory layout (under RUN_STATE_DIR/{simulation_id}/): + private_simulation_config.json — PrivateSimulationParameters.to_dict() + private/actions.jsonl — relational action log + simulation.log — subprocess stdout + stderr + run_state.json — persisted PrivateSimulationRunState + """ + + RUN_STATE_DIR = os.path.join( + os.path.dirname(__file__), + '../../uploads/simulations' + ) + SCRIPTS_DIR = os.path.join( + os.path.dirname(__file__), + '../../scripts' + ) + + CONFIG_FILENAME = "private_simulation_config.json" + SCRIPT_NAME = "run_private_simulation.py" + + # Class-level in-memory state (same pattern as SimulationRunner) + _run_states: Dict[str, PrivateSimulationRunState] = {} + _processes: Dict[str, subprocess.Popen] = {} + _monitor_threads: Dict[str, threading.Thread] = {} + _stdout_files: Dict[str, Any] = {} + _graph_memory_enabled: Dict[str, bool] = {} + + # ── Public API ───────────────────────────────────────────────────────────── + + @classmethod + def get_status(cls, simulation_id: str) -> Optional[PrivateSimulationRunState]: + """ + Return the current run state for a simulation. + + Checks in-memory cache first, then falls back to disk + (same pattern as SimulationRunner.get_run_state). + + Args: + simulation_id: Simulation identifier. + + Returns: + PrivateSimulationRunState or None if not found. + """ + if simulation_id in cls._run_states: + return cls._run_states[simulation_id] + return cls._load_run_state(simulation_id) + + @classmethod + def start_simulation( + cls, + simulation_id: str, + max_rounds: Optional[int] = None, + enable_graph_memory_update: bool = False, + graph_id: Optional[str] = None, + ) -> PrivateSimulationRunState: + """ + Launch the private impact simulation subprocess. + + Same mechanics as SimulationRunner.start_simulation (L.387–399): + - Reads private_simulation_config.json from the simulation directory + - Spawns run_private_simulation.py with start_new_session=True + - Redirects stdout/stderr to simulation.log + - Launches a background monitor thread + + Args: + simulation_id: Unique simulation identifier. + max_rounds: Optional upper bound on simulation rounds. + enable_graph_memory_update: Push activity updates to Zep graph. + graph_id: Required when enable_graph_memory_update=True. + + Returns: + PrivateSimulationRunState with status=STARTING. + + Raises: + ValueError: If already running, config missing, or graph_id absent. + """ + existing = cls.get_status(simulation_id) + if existing and existing.runner_status in ( + PrivateRunnerStatus.RUNNING, PrivateRunnerStatus.STARTING + ): + raise ValueError(f"Private simulation already running: {simulation_id}") + + sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) + config_path = os.path.join(sim_dir, cls.CONFIG_FILENAME) + + if not os.path.exists(config_path): + raise ValueError( + f"Private simulation config not found: {config_path}. " + "Call /prepare first to generate the config." + ) + + with open(config_path, 'r', encoding='utf-8') as f: + config = json.load(f) + + time_cfg = config.get("time_config", {}) + total_days = time_cfg.get("total_simulation_days", 30) + rounds_per_day = time_cfg.get("rounds_per_day", 3) + total_rounds = total_days * rounds_per_day + + if max_rounds is not None and max_rounds > 0: + total_rounds = min(total_rounds, max_rounds) + logger.info( + f"[PRIVATE] Rounds capped to {total_rounds} " + f"(max_rounds={max_rounds})" + ) + + state = PrivateSimulationRunState( + simulation_id=simulation_id, + runner_status=PrivateRunnerStatus.STARTING, + private_total_rounds=total_rounds, + private_total_days=total_days, + private_running=True, + started_at=datetime.now().isoformat(), + ) + cls._save_run_state(state) + + # Optional Zep graph memory update + if enable_graph_memory_update: + if not graph_id: + raise ValueError( + "graph_id is required when enable_graph_memory_update=True" + ) + try: + ZepGraphMemoryManager.create_updater(simulation_id, graph_id) + cls._graph_memory_enabled[simulation_id] = True + logger.info( + f"[PRIVATE] Graph memory update enabled: " + f"simulation_id={simulation_id}, graph_id={graph_id}" + ) + except Exception as e: + logger.error(f"[PRIVATE] Failed to create graph memory updater: {e}") + cls._graph_memory_enabled[simulation_id] = False + else: + cls._graph_memory_enabled[simulation_id] = False + + script_path = os.path.join(cls.SCRIPTS_DIR, cls.SCRIPT_NAME) + if not os.path.exists(script_path): + raise ValueError(f"Script not found: {script_path}") + + try: + cmd = [sys.executable, script_path, "--config", config_path] + if max_rounds is not None and max_rounds > 0: + cmd.extend(["--max-rounds", str(max_rounds)]) + + main_log_path = os.path.join(sim_dir, "simulation.log") + main_log_file = open(main_log_path, 'w', encoding='utf-8') + + env = os.environ.copy() + env['PYTHONUTF8'] = '1' + env['PYTHONIOENCODING'] = 'utf-8' + + process = subprocess.Popen( + cmd, + cwd=sim_dir, + stdout=main_log_file, + stderr=subprocess.STDOUT, + text=True, + encoding='utf-8', + bufsize=1, + env=env, + start_new_session=True, + ) + + cls._stdout_files[simulation_id] = main_log_file + state.process_pid = process.pid + state.runner_status = PrivateRunnerStatus.RUNNING + cls._processes[simulation_id] = process + cls._save_run_state(state) + + current_locale = get_locale() + monitor_thread = threading.Thread( + target=cls._monitor_simulation, + args=(simulation_id, current_locale), + daemon=True, + ) + monitor_thread.start() + cls._monitor_threads[simulation_id] = monitor_thread + + logger.info( + f"[PRIVATE] Simulation started: {simulation_id}, " + f"pid={process.pid}, total_rounds={total_rounds}, " + f"total_days={total_days}" + ) + + except Exception as e: + state.runner_status = PrivateRunnerStatus.FAILED + state.private_error = str(e) + state.private_running = False + cls._save_run_state(state) + raise + + return state + + @classmethod + def stop_simulation(cls, simulation_id: str) -> PrivateSimulationRunState: + """ + Stop a running private simulation with a clean SIGTERM. + + Same mechanics as SimulationRunner.stop_simulation. + + Args: + simulation_id: Simulation identifier. + + Returns: + Updated PrivateSimulationRunState with status=STOPPED. + + Raises: + ValueError: If simulation does not exist or is not running. + """ + state = cls.get_status(simulation_id) + if not state: + raise ValueError(f"Private simulation not found: {simulation_id}") + if state.runner_status not in ( + PrivateRunnerStatus.RUNNING, PrivateRunnerStatus.STARTING + ): + raise ValueError( + f"Private simulation is not running: " + f"{simulation_id}, status={state.runner_status}" + ) + + state.runner_status = PrivateRunnerStatus.STOPPING + cls._save_run_state(state) + + process = cls._processes.get(simulation_id) + if process and process.poll() is None: + try: + cls._terminate_process(process, simulation_id) + except ProcessLookupError: + pass + except Exception as e: + logger.error(f"[PRIVATE] Terminate failed: {simulation_id}, {e}") + try: + process.terminate() + process.wait(timeout=5) + except Exception: + process.kill() + + state.runner_status = PrivateRunnerStatus.STOPPED + state.private_running = False + state.completed_at = datetime.now().isoformat() + cls._save_run_state(state) + + if cls._graph_memory_enabled.get(simulation_id, False): + try: + ZepGraphMemoryManager.stop_updater(simulation_id) + except Exception as e: + logger.error(f"[PRIVATE] Failed to stop graph updater: {e}") + cls._graph_memory_enabled.pop(simulation_id, None) + + logger.info(f"[PRIVATE] Simulation stopped: {simulation_id}") + return state + + @classmethod + def get_all_actions( + cls, + simulation_id: str, + agent_id: Optional[int] = None, + round_num: Optional[int] = None, + ) -> List[PrivateAgentAction]: + """ + Read the complete private/actions.jsonl action history. + + Args: + simulation_id: Simulation identifier. + agent_id: Optional filter by agent ID. + round_num: Optional filter by round number. + + Returns: + List of PrivateAgentAction sorted by timestamp descending. + """ + sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) + log_path = os.path.join(sim_dir, "private", "actions.jsonl") + actions = cls._read_actions_from_file( + log_path, agent_id=agent_id, round_num=round_num + ) + actions.sort(key=lambda a: a.timestamp, reverse=True) + return actions + + @classmethod + def cleanup(cls, simulation_id: str) -> Dict[str, Any]: + """ + Remove private simulation artifacts to allow a fresh restart. + + Deletes: + - run_state.json + - simulation.log + - private_simulation.db + - private/ directory (contains actions.jsonl) + + Does NOT delete: private_simulation_config.json, profile files. + + Args: + simulation_id: Simulation identifier. + + Returns: + Dict with keys: success (bool), cleaned_files (list), errors (list|None). + """ + sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) + if not os.path.exists(sim_dir): + return {"success": True, "cleaned_files": [], "errors": None} + + cleaned: List[str] = [] + errors: List[str] = [] + + for filename in ("run_state.json", "simulation.log", "private_simulation.db"): + path = os.path.join(sim_dir, filename) + if os.path.exists(path): + try: + os.remove(path) + cleaned.append(filename) + except Exception as e: + errors.append(f"Failed to delete {filename}: {e}") + + private_dir = os.path.join(sim_dir, "private") + if os.path.exists(private_dir): + try: + shutil.rmtree(private_dir) + cleaned.append("private/") + except Exception as e: + errors.append(f"Failed to delete private/: {e}") + + cls._run_states.pop(simulation_id, None) + + logger.info( + f"[PRIVATE] Cleanup done: {simulation_id}, removed={cleaned}" + ) + return { + "success": len(errors) == 0, + "cleaned_files": cleaned, + "errors": errors or None, + } + + # ── Internal: monitor thread ─────────────────────────────────────────────── + + @classmethod + def _monitor_simulation(cls, simulation_id: str, locale: str = 'en') -> None: + """ + Background thread: poll private/actions.jsonl until subprocess exits. + + Same pattern as SimulationRunner._monitor_simulation (L.482–581): + - Loops while process is alive, reading new log lines every 2 s + - Performs a final read after process exit + - Sets COMPLETED or FAILED based on exit code + - Stops graph memory updater in finally block + + Args: + simulation_id: Simulation identifier. + locale: Locale inherited from the calling thread. + """ + set_locale(locale) + sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id) + private_log = os.path.join(sim_dir, "private", "actions.jsonl") + + process = cls._processes.get(simulation_id) + state = cls.get_status(simulation_id) + + if not process or not state: + return + + log_position = 0 + + try: + while process.poll() is None: + if os.path.exists(private_log): + log_position = cls._read_action_log( + private_log, log_position, state + ) + cls._save_run_state(state) + time.sleep(2) + + # Final read after process exits + if os.path.exists(private_log): + cls._read_action_log(private_log, log_position, state) + + exit_code = process.returncode + if exit_code == 0: + state.runner_status = PrivateRunnerStatus.COMPLETED + state.completed_at = datetime.now().isoformat() + logger.info(f"[PRIVATE] Simulation completed: {simulation_id}") + else: + state.runner_status = PrivateRunnerStatus.FAILED + main_log = os.path.join(sim_dir, "simulation.log") + error_tail = "" + try: + if os.path.exists(main_log): + with open(main_log, 'r', encoding='utf-8') as f: + error_tail = f.read()[-2000:] + except Exception: + pass + state.private_error = ( + f"Process exited with code {exit_code}. " + f"Last log output: {error_tail}" + ) + logger.error( + f"[PRIVATE] Simulation failed: {simulation_id}, " + f"exit_code={exit_code}" + ) + + state.private_running = False + cls._save_run_state(state) + + except Exception as e: + logger.error(f"[PRIVATE] Monitor thread error: {simulation_id}, {e}") + state.runner_status = PrivateRunnerStatus.FAILED + state.private_error = str(e) + cls._save_run_state(state) + + finally: + if cls._graph_memory_enabled.get(simulation_id, False): + try: + ZepGraphMemoryManager.stop_updater(simulation_id) + logger.info( + f"[PRIVATE] Graph memory updater stopped: {simulation_id}" + ) + except Exception as e: + logger.error( + f"[PRIVATE] Failed to stop graph updater: {e}" + ) + cls._graph_memory_enabled.pop(simulation_id, None) + + cls._processes.pop(simulation_id, None) + + if simulation_id in cls._stdout_files: + try: + cls._stdout_files[simulation_id].close() + except Exception: + pass + cls._stdout_files.pop(simulation_id, None) + + # ── Internal: log reader ─────────────────────────────────────────────────── + + @classmethod + def _read_action_log( + cls, + log_path: str, + position: int, + state: PrivateSimulationRunState, + ) -> int: + """ + Incremental read of private/actions.jsonl from a byte offset. + + Same pattern as SimulationRunner._read_action_log (L.683–684): + - Seeks to last read position, reads new lines only + - Calls ZepGraphMemoryUpdater.add_activity_from_dict(data, "private") + - Handles round_end and simulation_end event entries + + Args: + log_path: Absolute path to private/actions.jsonl. + position: Byte offset of the previous read. + state: Mutable run state to update in place. + + Returns: + New byte offset after reading. + """ + graph_memory_enabled = cls._graph_memory_enabled.get( + state.simulation_id, False + ) + graph_updater = None + if graph_memory_enabled: + graph_updater = ZepGraphMemoryManager.get_updater(state.simulation_id) + + try: + with open(log_path, 'r', encoding='utf-8') as f: + f.seek(position) + for line in f: + line = line.strip() + if not line: + continue + try: + data = json.loads(line) + + # Structured event entries (no agent_id) + if "event_type" in data: + event_type = data["event_type"] + + if event_type == "simulation_end": + state.private_completed = True + state.private_running = False + state.runner_status = PrivateRunnerStatus.COMPLETED + state.completed_at = datetime.now().isoformat() + logger.info( + f"[PRIVATE] simulation_end received: " + f"{state.simulation_id}, " + f"total_rounds={data.get('total_rounds')}, " + f"total_actions={data.get('total_actions')}" + ) + + elif event_type == "round_end": + round_num = data.get("round", 0) + if round_num > state.private_current_round: + state.private_current_round = round_num + # simulated_day may be written by run_private_simulation.py + simulated_day = data.get("simulated_day", 0) + if simulated_day > state.private_simulated_days: + state.private_simulated_days = simulated_day + + continue + + # Skip non-agent entries + if "agent_id" not in data: + continue + + action = PrivateAgentAction( + round_num=data.get("round", 0), + timestamp=data.get( + "timestamp", datetime.now().isoformat() + ), + agent_id=data.get("agent_id", 0), + agent_name=data.get("agent_name", ""), + action_type=data.get("action_type", ""), + action_args=data.get("action_args", {}), + result=data.get("result"), + success=data.get("success", True), + ) + state.add_action(action) + + if action.round_num > state.private_current_round: + state.private_current_round = action.round_num + + # Push to Zep graph memory with platform="private" + if graph_updater: + graph_updater.add_activity_from_dict(data, "private") + + except json.JSONDecodeError: + pass + return f.tell() + + except Exception as e: + logger.warning( + f"[PRIVATE] Failed to read action log: {log_path}, {e}" + ) + return position + + # ── Internal: persistence ───────────────────────────────────────────────── + + @classmethod + def _save_run_state(cls, state: PrivateSimulationRunState) -> None: + """Persist run state to run_state.json and update in-memory cache.""" + sim_dir = os.path.join(cls.RUN_STATE_DIR, state.simulation_id) + os.makedirs(sim_dir, exist_ok=True) + state_file = os.path.join(sim_dir, "run_state.json") + with open(state_file, 'w', encoding='utf-8') as f: + json.dump(state.to_detail_dict(), f, ensure_ascii=False, indent=2) + cls._run_states[state.simulation_id] = state + + @classmethod + def _load_run_state( + cls, simulation_id: str + ) -> Optional[PrivateSimulationRunState]: + """ + Load run state from disk. + + Same pattern as SimulationRunner._load_run_state. + + Args: + simulation_id: Simulation identifier. + + Returns: + PrivateSimulationRunState or None on failure / missing file. + """ + state_file = os.path.join( + cls.RUN_STATE_DIR, simulation_id, "run_state.json" + ) + if not os.path.exists(state_file): + return None + + try: + with open(state_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + state = PrivateSimulationRunState( + simulation_id=simulation_id, + runner_status=PrivateRunnerStatus( + data.get("runner_status", "idle") + ), + private_current_round=data.get("private_current_round", 0), + private_total_rounds=data.get("private_total_rounds", 0), + private_simulated_days=data.get("private_simulated_days", 0), + private_total_days=data.get("private_total_days", 0), + private_running=data.get("private_running", False), + private_actions_count=data.get("private_actions_count", 0), + private_completed=data.get("private_completed", False), + private_error=data.get("private_error"), + started_at=data.get("started_at"), + updated_at=data.get("updated_at", datetime.now().isoformat()), + completed_at=data.get("completed_at"), + process_pid=data.get("process_pid"), + ) + + for a in data.get("recent_actions", []): + state.recent_actions.append(PrivateAgentAction( + round_num=a.get("round_num", 0), + timestamp=a.get("timestamp", ""), + agent_id=a.get("agent_id", 0), + agent_name=a.get("agent_name", ""), + action_type=a.get("action_type", ""), + action_args=a.get("action_args", {}), + result=a.get("result"), + success=a.get("success", True), + )) + + cls._run_states[simulation_id] = state + return state + + except Exception as e: + logger.error( + f"[PRIVATE] Failed to load run state: {simulation_id}, {e}" + ) + return None + + @classmethod + def _read_actions_from_file( + cls, + file_path: str, + agent_id: Optional[int] = None, + round_num: Optional[int] = None, + ) -> List[PrivateAgentAction]: + """ + Read all agent actions from a JSONL file with optional filters. + + Args: + file_path: Path to actions.jsonl. + agent_id: Optional filter by agent ID. + round_num: Optional filter by round number. + + Returns: + List of PrivateAgentAction instances. + """ + if not os.path.exists(file_path): + return [] + + actions: List[PrivateAgentAction] = [] + with open(file_path, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if not line: + continue + try: + data = json.loads(line) + if "event_type" in data: + continue + if "agent_id" not in data: + continue + if agent_id is not None and data.get("agent_id") != agent_id: + continue + if round_num is not None and data.get("round") != round_num: + continue + actions.append(PrivateAgentAction( + round_num=data.get("round", 0), + timestamp=data.get("timestamp", ""), + agent_id=data.get("agent_id", 0), + agent_name=data.get("agent_name", ""), + action_type=data.get("action_type", ""), + action_args=data.get("action_args", {}), + result=data.get("result"), + success=data.get("success", True), + )) + except json.JSONDecodeError: + continue + + return actions + + # ── Internal: process management ────────────────────────────────────────── + + @classmethod + def _terminate_process( + cls, + process: subprocess.Popen, + simulation_id: str, + timeout: int = 10, + ) -> None: + """ + Terminate subprocess and its children cross-platform. + + Same implementation as SimulationRunner._terminate_process: + - Windows: taskkill /PID /T, then /F if unresponsive + - Unix: SIGTERM to process group, SIGKILL on timeout + + Args: + process: Subprocess to terminate. + simulation_id: Simulation ID for logging. + timeout: Seconds to wait before force-killing. + """ + if IS_WINDOWS: + logger.info( + f"[PRIVATE] Terminating process tree (Windows): " + f"simulation={simulation_id}, pid={process.pid}" + ) + try: + subprocess.run( + ['taskkill', '/PID', str(process.pid), '/T'], + capture_output=True, timeout=5 + ) + try: + process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + logger.warning( + f"[PRIVATE] Process unresponsive, force killing: " + f"{simulation_id}" + ) + subprocess.run( + ['taskkill', '/F', '/PID', str(process.pid), '/T'], + capture_output=True, timeout=5 + ) + process.wait(timeout=5) + except Exception as e: + logger.warning(f"[PRIVATE] taskkill failed, falling back: {e}") + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + else: + pgid = os.getpgid(process.pid) + logger.info( + f"[PRIVATE] Terminating process group (Unix): " + f"simulation={simulation_id}, pgid={pgid}" + ) + os.killpg(pgid, signal.SIGTERM) + try: + process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + logger.warning( + f"[PRIVATE] Process group unresponsive, force killing: " + f"{simulation_id}" + ) + os.killpg(pgid, signal.SIGKILL) + process.wait(timeout=5) diff --git a/backend/app/services/simulation_runner.py b/backend/app/services/simulation_runner.py index e86021f8..5053ca17 100644 --- a/backend/app/services/simulation_runner.py +++ b/backend/app/services/simulation_runner.py @@ -125,7 +125,14 @@ class SimulationRunState: # 平台完成状态(通过检测 actions.jsonl 中的 simulation_end 事件) twitter_completed: bool = False reddit_completed: bool = False - + + # Private Impact platform state + private_current_round: int = 0 + private_simulated_days: int = 0 + private_running: bool = False + private_actions_count: int = 0 + private_completed: bool = False + # 每轮摘要 rounds: List[RoundSummary] = field(default_factory=list) @@ -152,6 +159,8 @@ class SimulationRunState: if action.platform == "twitter": self.twitter_actions_count += 1 + elif action.platform == "private": + self.private_actions_count += 1 else: self.reddit_actions_count += 1 @@ -177,7 +186,12 @@ class SimulationRunState: "reddit_completed": self.reddit_completed, "twitter_actions_count": self.twitter_actions_count, "reddit_actions_count": self.reddit_actions_count, - "total_actions_count": self.twitter_actions_count + self.reddit_actions_count, + "private_current_round": self.private_current_round, + "private_simulated_days": self.private_simulated_days, + "private_running": self.private_running, + "private_completed": self.private_completed, + "private_actions_count": self.private_actions_count, + "total_actions_count": self.twitter_actions_count + self.reddit_actions_count + self.private_actions_count, "started_at": self.started_at, "updated_at": self.updated_at, "completed_at": self.completed_at, @@ -268,6 +282,11 @@ class SimulationRunner: reddit_completed=data.get("reddit_completed", False), twitter_actions_count=data.get("twitter_actions_count", 0), reddit_actions_count=data.get("reddit_actions_count", 0), + private_current_round=data.get("private_current_round", 0), + private_simulated_days=data.get("private_simulated_days", 0), + private_running=data.get("private_running", False), + private_completed=data.get("private_completed", False), + private_actions_count=data.get("private_actions_count", 0), started_at=data.get("started_at"), updated_at=data.get("updated_at", datetime.now().isoformat()), completed_at=data.get("completed_at"), @@ -391,6 +410,9 @@ class SimulationRunner: elif platform == "reddit": script_name = "run_reddit_simulation.py" state.reddit_running = True + elif platform == "private": + script_name = "run_private_simulation.py" + state.private_running = True else: script_name = "run_parallel_simulation.py" state.twitter_running = True @@ -487,15 +509,17 @@ class SimulationRunner: # 新的日志结构:分平台的动作日志 twitter_actions_log = os.path.join(sim_dir, "twitter", "actions.jsonl") reddit_actions_log = os.path.join(sim_dir, "reddit", "actions.jsonl") - + private_actions_log = os.path.join(sim_dir, "private", "actions.jsonl") + process = cls._processes.get(simulation_id) state = cls.get_run_state(simulation_id) - + if not process or not state: return - + twitter_position = 0 reddit_position = 0 + private_position = 0 try: while process.poll() is None: # 进程仍在运行 @@ -510,7 +534,13 @@ class SimulationRunner: reddit_position = cls._read_action_log( reddit_actions_log, reddit_position, state, "reddit" ) - + + # 读取 Private 动作日志 + if os.path.exists(private_actions_log): + private_position = cls._read_action_log( + private_actions_log, private_position, state, "private" + ) + # 更新状态 cls._save_run_state(state) time.sleep(2) @@ -520,6 +550,8 @@ class SimulationRunner: cls._read_action_log(twitter_actions_log, twitter_position, state, "twitter") if os.path.exists(reddit_actions_log): cls._read_action_log(reddit_actions_log, reddit_position, state, "reddit") + if os.path.exists(private_actions_log): + cls._read_action_log(private_actions_log, private_position, state, "private") # 进程结束 exit_code = process.returncode @@ -544,8 +576,9 @@ class SimulationRunner: state.twitter_running = False state.reddit_running = False + state.private_running = False cls._save_run_state(state) - + except Exception as e: logger.error(f"监控线程异常: {simulation_id}, error={str(e)}") state.runner_status = RunnerStatus.FAILED @@ -629,6 +662,10 @@ class SimulationRunner: state.reddit_completed = True state.reddit_running = False logger.info(f"Reddit 模拟已完成: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}") + elif platform == "private": + state.private_completed = True + state.private_running = False + logger.info(f"Private 模拟已完成: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}") # 检查是否所有启用的平台都已完成 # 如果只运行了一个平台,只检查那个平台 @@ -653,7 +690,13 @@ class SimulationRunner: if round_num > state.reddit_current_round: state.reddit_current_round = round_num state.reddit_simulated_hours = simulated_hours - + elif platform == "private": + if round_num > state.private_current_round: + state.private_current_round = round_num + simulated_day = action_data.get("simulated_day", 0) + if simulated_day > state.private_simulated_days: + state.private_simulated_days = simulated_day + # 总体轮次取两个平台的最大值 if round_num > state.current_round: state.current_round = round_num @@ -703,19 +746,23 @@ class SimulationRunner: sim_dir = os.path.join(cls.RUN_STATE_DIR, state.simulation_id) twitter_log = os.path.join(sim_dir, "twitter", "actions.jsonl") reddit_log = os.path.join(sim_dir, "reddit", "actions.jsonl") - + private_log = os.path.join(sim_dir, "private", "actions.jsonl") + # 检查哪些平台被启用(通过文件是否存在判断) twitter_enabled = os.path.exists(twitter_log) reddit_enabled = os.path.exists(reddit_log) - + private_enabled = os.path.exists(private_log) + # 如果平台被启用但未完成,则返回 False if twitter_enabled and not state.twitter_completed: return False if reddit_enabled and not state.reddit_completed: return False - + if private_enabled and not state.private_completed: + return False + # 至少有一个平台被启用且已完成 - return twitter_enabled or reddit_enabled + return twitter_enabled or reddit_enabled or private_enabled @classmethod def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10): @@ -806,9 +853,10 @@ class SimulationRunner: state.runner_status = RunnerStatus.STOPPED state.twitter_running = False state.reddit_running = False + state.private_running = False state.completed_at = datetime.now().isoformat() cls._save_run_state(state) - + # 停止图谱记忆更新器 if cls._graph_memory_enabled.get(simulation_id, False): try: @@ -934,7 +982,18 @@ class SimulationRunner: agent_id=agent_id, round_num=round_num )) - + + # 读取 Private 动作文件 + private_actions_log = os.path.join(sim_dir, "private", "actions.jsonl") + if not platform or platform == "private": + actions.extend(cls._read_actions_from_file( + private_actions_log, + default_platform="private", + platform_filter=platform, + agent_id=agent_id, + round_num=round_num + )) + # 如果分平台文件不存在,尝试读取旧的单一文件格式 if not actions: actions_log = os.path.join(sim_dir, "actions.jsonl") @@ -1140,11 +1199,12 @@ class SimulationRunner: "stderr.log", "twitter_simulation.db", # Twitter 平台数据库 "reddit_simulation.db", # Reddit 平台数据库 + "private_simulation.db", # Private Impact 平台数据库 "env_status.json", # 环境状态文件 ] - + # 要删除的目录列表(包含动作日志) - dirs_to_clean = ["twitter", "reddit"] + dirs_to_clean = ["twitter", "reddit", "private"] # 删除文件 for filename in files_to_delete: @@ -1236,6 +1296,7 @@ class SimulationRunner: state.runner_status = RunnerStatus.STOPPED state.twitter_running = False state.reddit_running = False + state.private_running = False state.completed_at = datetime.now().isoformat() state.error = "服务器关闭,模拟被终止" cls._save_run_state(state) diff --git a/backend/run.py b/backend/run.py index 4e3b04fa..18c5bbce 100644 --- a/backend/run.py +++ b/backend/run.py @@ -38,7 +38,7 @@ def main(): # 获取运行配置 host = os.environ.get('FLASK_HOST', '0.0.0.0') - port = int(os.environ.get('FLASK_PORT', 5001)) + port = int(os.environ.get('FLASK_PORT', 9902)) debug = Config.DEBUG # 启动服务 diff --git a/backend/scripts/action_logger.py b/backend/scripts/action_logger.py index 38d025a6..340b47fd 100644 --- a/backend/scripts/action_logger.py +++ b/backend/scripts/action_logger.py @@ -132,6 +132,7 @@ class SimulationLogManager: self.simulation_dir = simulation_dir self.twitter_logger: Optional[PlatformActionLogger] = None self.reddit_logger: Optional[PlatformActionLogger] = None + self.private_logger: Optional[PlatformActionLogger] = None self._main_logger: Optional[logging.Logger] = None # 设置主日志 @@ -177,7 +178,13 @@ class SimulationLogManager: if self.reddit_logger is None: self.reddit_logger = PlatformActionLogger("reddit", self.simulation_dir) return self.reddit_logger - + + def get_private_logger(self) -> PlatformActionLogger: + """获取 Private Impact 平台日志记录器""" + if self.private_logger is None: + self.private_logger = PlatformActionLogger("private", self.simulation_dir) + return self.private_logger + def log(self, message: str, level: str = "info"): """记录主日志""" if self._main_logger: diff --git a/backend/scripts/run_private_simulation.py b/backend/scripts/run_private_simulation.py new file mode 100644 index 00000000..d0c4ac07 --- /dev/null +++ b/backend/scripts/run_private_simulation.py @@ -0,0 +1,977 @@ +""" +Private Impact Simulation Script + +Simulates the impact of a private decision in a closed relational network. +Unlike Twitter/Reddit OASIS simulations, this mode has no social media platform: +no echo chamber, no recency weight, no asyncio.gather() across two platforms. + +Differences from run_parallel_simulation.py: +- No OASIS env / agent_graph (no Twitter, no Reddit, no PlatformConfig) +- Single relational simulation (no asyncio.gather parallel platforms) +- Relational actions: REACT_PRIVATELY, CONFRONT, COALITION_BUILD, + SILENT_LEAVE, VOCAL_SUPPORT, DO_NOTHING +- Propagation via cascade_influence graph: + distance 1 = direct exposure (initial_posts targets) + distance 2 = cascade via cascade_influence of reacting agents +- Direct LLM calls via camel-ai ChatAgent (no OASIS action loop) +- Output: backend/scripts/private/actions.jsonl (same JSONL format) +- zep_graph_memory_updater.py reused as-is (platform="private") + +Usage: + python run_private_simulation.py --config simulation_config.json + python run_private_simulation.py --config simulation_config.json --no-wait + python run_private_simulation.py --config simulation_config.json --max-rounds 10 + +Log structure: + sim_xxx/ + ├── private/ + │ └── actions.jsonl # Relational network action log + ├── simulation.log # Main simulation process log + └── run_state.json # Run state (API polling) +""" + +# ============================================================ +# Windows UTF-8 fix — same as run_parallel_simulation.py +# ============================================================ +import sys +import os + +if sys.platform == 'win32': + os.environ.setdefault('PYTHONUTF8', '1') + os.environ.setdefault('PYTHONIOENCODING', 'utf-8') + + if hasattr(sys.stdout, 'reconfigure'): + sys.stdout.reconfigure(encoding='utf-8', errors='replace') + if hasattr(sys.stderr, 'reconfigure'): + sys.stderr.reconfigure(encoding='utf-8', errors='replace') + + import builtins + _original_open = builtins.open + + def _utf8_open(file, mode='r', buffering=-1, encoding=None, errors=None, + newline=None, closefd=True, opener=None): + """Wrap open() to default to UTF-8 for text mode — fixes third-party libs.""" + if encoding is None and 'b' not in mode: + encoding = 'utf-8' + return _original_open(file, mode, buffering, encoding, errors, + newline, closefd, opener) + + builtins.open = _utf8_open + +import argparse +import asyncio +import json +import logging +import random +import signal +from datetime import datetime +from typing import Any, Dict, List, Optional, Set, Tuple + +# ── Path setup (same as run_parallel_simulation.py) ────────────────────────── +_scripts_dir = os.path.dirname(os.path.abspath(__file__)) +_backend_dir = os.path.abspath(os.path.join(_scripts_dir, '..')) +_project_root = os.path.abspath(os.path.join(_backend_dir, '..')) +sys.path.insert(0, _scripts_dir) +sys.path.insert(0, _backend_dir) + +from dotenv import load_dotenv +_env_file = os.path.join(_project_root, '.env') +if os.path.exists(_env_file): + load_dotenv(_env_file) + print(f"Loaded env config: {_env_file}") +else: + _backend_env = os.path.join(_backend_dir, '.env') + if os.path.exists(_backend_env): + load_dotenv(_backend_env) + print(f"Loaded env config: {_backend_env}") + +# ── Logging helpers ─────────────────────────────────────────────────────────── + +class MaxTokensWarningFilter(logging.Filter): + """Suppress camel-ai max_tokens warnings (intentionally not set).""" + + def filter(self, record: logging.LogRecord) -> bool: + if "max_tokens" in record.getMessage() and "Invalid or missing" in record.getMessage(): + return False + return True + + +logging.getLogger().addFilter(MaxTokensWarningFilter()) + +from action_logger import SimulationLogManager, PlatformActionLogger + +try: + from camel.models import ModelFactory + from camel.types import ModelPlatformType + from camel.messages import BaseMessage + from camel.agents import ChatAgent +except ImportError as e: + print(f"Error: missing dependency {e}") + print("Please install: pip install camel-ai") + sys.exit(1) + +# Optional: Zep graph memory updater — same as run_parallel_simulation.py +try: + from app.services.zep_graph_memory_updater import AgentActivity, ZepGraphMemoryUpdater + _ZEP_AVAILABLE = True +except ImportError: + _ZEP_AVAILABLE = False + +# ── Global shutdown state ───────────────────────────────────────────────────── +_shutdown_event: Optional[asyncio.Event] = None +_cleanup_done: bool = False + +# ── Relational actions — no social media vocabulary ─────────────────────────── +# Divergence from run_parallel_simulation.py: +# TWITTER_ACTIONS / REDDIT_ACTIONS → PRIVATE_ACTIONS +PRIVATE_ACTIONS = [ + "REACT_PRIVATELY", # Internal reaction — changes agent state, not visible externally + "CONFRONT", # Direct confrontation with the decision maker + "COALITION_BUILD", # Rallies other network agents to a shared reaction + "SILENT_LEAVE", # Progressive disengagement (resignation, client churn...) + "VOCAL_SUPPORT", # Public or private defense of the decision + "DO_NOTHING", # Indifference — absorbs without reacting +] + +# IPC constants (same pattern as run_parallel_simulation.py) +IPC_COMMANDS_DIR = "ipc_commands" +IPC_RESPONSES_DIR = "ipc_responses" +ENV_STATUS_FILE = "env_status.json" + + +class CommandType: + INTERVIEW = "interview" + BATCH_INTERVIEW = "batch_interview" + CLOSE_ENV = "close_env" + + +# ── Config / model helpers ──────────────────────────────────────────────────── + +def load_config(config_path: str) -> Dict[str, Any]: + """Load simulation config JSON.""" + with open(config_path, 'r', encoding='utf-8') as f: + return json.load(f) + + +def create_model(config: Dict[str, Any]): + """ + Create LLM model from environment variables. + Same logic as run_parallel_simulation.py — no boost variant needed here + (single simulation, no parallel platforms to distribute load). + """ + llm_api_key = os.environ.get("LLM_API_KEY", "") + llm_base_url = os.environ.get("LLM_BASE_URL", "") + llm_model = os.environ.get("LLM_MODEL_NAME", "") or config.get("llm_model", "gpt-4o-mini") + + if llm_api_key: + os.environ["OPENAI_API_KEY"] = llm_api_key + + if not os.environ.get("OPENAI_API_KEY"): + raise ValueError("Missing API Key — set LLM_API_KEY in .env") + + if llm_base_url: + os.environ["OPENAI_API_BASE_URL"] = llm_base_url + + print(f"[Private] model={llm_model}, base_url={llm_base_url[:40] if llm_base_url else 'default'}...") + + return ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=llm_model, + ) + + +def get_agent_names_from_config(config: Dict[str, Any]) -> Dict[int, str]: + """Build agent_id → entity_name mapping from simulation_config.""" + agent_names = {} + for cfg in config.get("agent_configs", []): + agent_id = cfg.get("agent_id") + if agent_id is not None: + agent_names[agent_id] = cfg.get("entity_name", f"Agent_{agent_id}") + return agent_names + + +# ── Relational graph ────────────────────────────────────────────────────────── + +def build_relational_graph(agent_configs: List[Dict[str, Any]]) -> Dict[int, List[int]]: + """ + Build the cascade influence graph from agent configs. + + Divergence from run_parallel_simulation.py: + No OASIS platform graph — this is a relational influence graph where + cascade_influence[agent_id] = [list of agent_ids this agent can expose]. + + Returns: + {agent_id: [influenced_agent_ids]} + """ + graph: Dict[int, List[int]] = {} + for cfg in agent_configs: + agent_id = cfg.get("agent_id", 0) + graph[agent_id] = cfg.get("cascade_influence", []) + return graph + + +def get_initial_exposed_agents(config: Dict[str, Any]) -> Set[int]: + """ + Determine distance-1 agents: those directly targeted by initial_posts. + + Divergence from run_parallel_simulation.py: + Instead of posting on Twitter/Reddit, the decision maker (poster_agent_id=0) + announces the decision to agents listed in initial_posts targets. + All agent_ids mentioned in initial_posts (excluding the poster) are exposed. + """ + exposed: Set[int] = set() + event_config = config.get("event_config", {}) + for post in event_config.get("initial_posts", []): + poster_id = post.get("poster_agent_id", 0) + # All agents except the poster are exposed at distance 1 + for cfg in config.get("agent_configs", []): + agent_id = cfg.get("agent_id") + if agent_id is not None and agent_id != poster_id: + exposed.add(agent_id) + return exposed + + +def get_decision_context(config: Dict[str, Any]) -> str: + """ + Extract the triggering decision text from event_config.initial_posts. + + Reuses the initial_posts mechanism but changes its semantic: + instead of a Twitter post, it is the private decision announcement. + """ + event_config = config.get("event_config", {}) + posts = event_config.get("initial_posts", []) + if posts: + return posts[0].get("content", "A private decision has been made.") + return "A private decision has been made." + + +# ── Active agent selection ──────────────────────────────────────────────────── + +def get_active_agents_for_round_private( + agent_configs: List[Dict[str, Any]], + exposed_agents: Set[int], + current_hour: int, + round_num: int, + time_config: Dict[str, Any], +) -> List[Dict[str, Any]]: + """ + Select active agents for this round. + + Divergence from run_parallel_simulation.py: + Only exposed agents are eligible (relational propagation gate). + Active hours and activity_level logic is preserved from the original. + + Args: + agent_configs: Full list of agent configs. + exposed_agents: Set of agent_ids who have received the decision. + current_hour: Simulated hour (0-23). + round_num: Current round number. + time_config: Time configuration from simulation config. + + Returns: + List of agent config dicts for agents that will act this round. + """ + base_min = time_config.get("agents_per_hour_min", 3) + base_max = time_config.get("agents_per_hour_max", 10) + + peak_hours = time_config.get("peak_hours", [9, 10, 11, 14, 15, 20, 21, 22]) + off_peak_hours = time_config.get("off_peak_hours", [0, 1, 2, 3, 4, 5]) + + if current_hour in peak_hours: + multiplier = time_config.get("peak_activity_multiplier", 1.5) + elif current_hour in off_peak_hours: + multiplier = time_config.get("off_peak_activity_multiplier", 0.3) + else: + multiplier = 1.0 + + target_count = int(random.uniform(base_min, base_max) * multiplier) + + candidates = [] + for cfg in agent_configs: + agent_id = cfg.get("agent_id", 0) + + # Only exposed agents can act (relational propagation gate) + if agent_id not in exposed_agents: + continue + + active_hours = cfg.get("active_hours", list(range(8, 23))) + if current_hour not in active_hours: + continue + + activity_level = cfg.get("activity_level", 0.5) + if random.random() < activity_level: + candidates.append(cfg) + + selected = random.sample(candidates, min(target_count, len(candidates))) if candidates else [] + return selected + + +# ── LLM agent decision ──────────────────────────────────────────────────────── + +async def get_agent_action( + agent_config: Dict[str, Any], + decision_context: str, + network_summary: str, + model: Any, + round_num: int, +) -> Dict[str, Any]: + """ + Query LLM for the agent's relational action in the current round. + + Divergence from run_parallel_simulation.py: + No OASIS action loop — direct ChatAgent call. + The persona field encodes all behavioral dimensions (relational link, + trust level, financial sensitivity, reaction mode). + LLM reads this context and adapts decisions naturally — same pattern + as OASIS where persona is injected as-is into the system prompt. + + Args: + agent_config: Agent configuration dict (must contain "persona"). + decision_context: The triggering decision text. + network_summary: Recent network activity (last N rounds). + model: camel-ai model instance. + round_num: Current round number. + + Returns: + {"action_type": str, "action_args": dict} + """ + persona = agent_config.get("persona", "You are a member of a professional network.") + entity_name = agent_config.get( + "entity_name", f"Agent_{agent_config.get('agent_id', 0)}" + ) + + actions_list = "\n".join(f"- {a}" for a in PRIVATE_ACTIONS) + system_content = ( + f"{persona}\n\n" + f"Available actions (choose exactly one):\n{actions_list}\n\n" + "Respond with valid JSON only, no markdown:\n" + '{"action": "", "reasoning": "", ' + '"target_agents": []}' + ) + + user_content = ( + f"Round {round_num}.\n\n" + f"Triggering decision:\n{decision_context}\n\n" + f"Recent network activity:\n{network_summary if network_summary else 'No prior activity.'}\n\n" + "What do you do? Choose one action. Respond in JSON only." + ) + + def _sync_call() -> Dict[str, Any]: + """Synchronous LLM call — wrapped in asyncio.to_thread to avoid blocking.""" + try: + agent = ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name=entity_name, + content=system_content, + ), + model=model, + ) + response = agent.step( + BaseMessage.make_user_message( + role_name="Facilitator", + content=user_content, + ) + ) + text = response.msg.content.strip() + + # Strip markdown code fence if present + if "```" in text: + parts = text.split("```") + text = parts[1] if len(parts) > 1 else parts[0] + if text.startswith("json"): + text = text[4:].strip() + + data = json.loads(text) + action_type = str(data.get("action", "DO_NOTHING")).upper() + if action_type not in PRIVATE_ACTIONS: + action_type = "DO_NOTHING" + + return { + "action_type": action_type, + "action_args": { + "reasoning": data.get("reasoning", ""), + "target_agents": data.get("target_agents", []), + }, + } + except Exception: + return {"action_type": "DO_NOTHING", "action_args": {}} + + return await asyncio.to_thread(_sync_call) + + +# ── Private IPC handler ─────────────────────────────────────────────────────── + +class PrivateIPCHandler: + """ + IPC command handler for the private impact simulation. + + Divergence from ParallelIPCHandler in run_parallel_simulation.py: + Single simulation context (no twitter_env / reddit_env). + Interview responses are generated via direct LLM call. + """ + + def __init__( + self, + simulation_dir: str, + agent_configs: List[Dict[str, Any]], + model: Any, + ): + self.simulation_dir = simulation_dir + self.agent_configs = {cfg["agent_id"]: cfg for cfg in agent_configs} + self.model = model + + self.commands_dir = os.path.join(simulation_dir, IPC_COMMANDS_DIR) + self.responses_dir = os.path.join(simulation_dir, IPC_RESPONSES_DIR) + self.status_file = os.path.join(simulation_dir, ENV_STATUS_FILE) + + os.makedirs(self.commands_dir, exist_ok=True) + os.makedirs(self.responses_dir, exist_ok=True) + + def update_status(self, status: str) -> None: + """Write current env status to disk.""" + with open(self.status_file, 'w', encoding='utf-8') as f: + json.dump({ + "status": status, + "platform": "private", + "timestamp": datetime.now().isoformat(), + }, f, ensure_ascii=False, indent=2) + + def poll_command(self) -> Optional[Dict[str, Any]]: + """Poll for pending IPC commands.""" + if not os.path.exists(self.commands_dir): + return None + + command_files = sorted( + (os.path.join(self.commands_dir, fn), os.path.getmtime(os.path.join(self.commands_dir, fn))) + for fn in os.listdir(self.commands_dir) + if fn.endswith('.json') + ) + + for filepath, _ in command_files: + try: + with open(filepath, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + continue + return None + + def send_response( + self, + command_id: str, + status: str, + result: Optional[Dict] = None, + error: Optional[str] = None, + ) -> None: + """Write response file and delete the command file.""" + response = { + "command_id": command_id, + "status": status, + "result": result, + "error": error, + "timestamp": datetime.now().isoformat(), + } + response_file = os.path.join(self.responses_dir, f"{command_id}.json") + with open(response_file, 'w', encoding='utf-8') as f: + json.dump(response, f, ensure_ascii=False, indent=2) + + command_file = os.path.join(self.commands_dir, f"{command_id}.json") + try: + os.remove(command_file) + except OSError: + pass + + async def handle_interview( + self, + command_id: str, + agent_id: int, + prompt: str, + ) -> bool: + """Interview an agent via direct LLM call.""" + cfg = self.agent_configs.get(agent_id) + if not cfg: + self.send_response(command_id, "failed", error=f"Agent {agent_id} not found") + return False + + persona = cfg.get("persona", "") + entity_name = cfg.get("entity_name", f"Agent_{agent_id}") + + def _sync_interview() -> str: + agent = ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name=entity_name, + content=persona, + ), + model=self.model, + ) + response = agent.step( + BaseMessage.make_user_message(role_name="Interviewer", content=prompt) + ) + return response.msg.content + + try: + answer = await asyncio.to_thread(_sync_interview) + self.send_response(command_id, "completed", result={ + "agent_id": agent_id, + "agent_name": entity_name, + "platform": "private", + "prompt": prompt, + "response": answer, + "timestamp": datetime.now().isoformat(), + }) + print(f" Interview done: agent_id={agent_id}") + return True + except Exception as e: + self.send_response(command_id, "failed", error=str(e)) + return False + + async def handle_batch_interview( + self, + command_id: str, + interviews: List[Dict[str, Any]], + ) -> bool: + """Batch interview multiple agents.""" + tasks = [ + self.handle_interview( + f"{command_id}_{i}", + item.get("agent_id", 0), + item.get("prompt", ""), + ) + for i, item in enumerate(interviews) + ] + results_flags = await asyncio.gather(*tasks) + success_count = sum(results_flags) + + if success_count > 0: + self.send_response(command_id, "completed", result={ + "interviews_count": success_count, + }) + return True + + self.send_response(command_id, "failed", error="All interviews failed") + return False + + async def process_commands(self) -> bool: + """ + Process pending IPC commands. + + Returns: + True to keep running, False to exit. + """ + command = self.poll_command() + if not command: + return True + + command_id = command.get("command_id") + command_type = command.get("command_type") + args = command.get("args", {}) + + print(f"\nIPC command received: {command_type}, id={command_id}") + + if command_type == CommandType.INTERVIEW: + await self.handle_interview( + command_id, + args.get("agent_id", 0), + args.get("prompt", ""), + ) + return True + + if command_type == CommandType.BATCH_INTERVIEW: + await self.handle_batch_interview( + command_id, + args.get("interviews", []), + ) + return True + + if command_type == CommandType.CLOSE_ENV: + print("Close env command received") + self.send_response(command_id, "completed", result={"message": "Environment closing"}) + return False + + self.send_response(command_id, "failed", error=f"Unknown command: {command_type}") + return True + + +# ── Main simulation coroutine ───────────────────────────────────────────────── + +async def run_private_simulation( + config: Dict[str, Any], + simulation_dir: str, + action_logger: Optional[PlatformActionLogger] = None, + main_logger: Optional[SimulationLogManager] = None, + max_rounds: Optional[int] = None, + zep_updater: Optional[Any] = None, +) -> Tuple[int, List[Dict[str, Any]]]: + """ + Run the private impact simulation. + + Divergence from run_twitter_simulation / run_reddit_simulation: + - No OASIS env — direct LLM calls per agent per round + - No PlatformConfig (no recency_weight, no echo_chamber) + - Relational graph drives exposure propagation + - REACT_PRIVATELY does NOT cascade (internal reaction, invisible) + - All other non-DO_NOTHING actions cascade to cascade_influence targets + + Args: + config: Simulation config dict. + simulation_dir: Directory for log output. + action_logger: PlatformActionLogger instance ("private" platform). + main_logger: SimulationLogManager for main simulation.log. + max_rounds: Optional round cap. + zep_updater: Optional ZepGraphMemoryUpdater (reused as-is). + + Returns: + (total_actions, agent_configs) — for IPC handler initialisation. + """ + + def log(msg: str) -> None: + if main_logger: + main_logger.info(f"[Private] {msg}") + print(f"[Private] {msg}") + + log("Initializing...") + + agent_configs = config.get("agent_configs", []) + agent_names = get_agent_names_from_config(config) + time_config = config.get("time_config", {}) + + # Build relational cascade graph + relational_graph = build_relational_graph(agent_configs) + + # Agents exposed to the decision at simulation start (distance 1) + exposed_agents: Set[int] = get_initial_exposed_agents(config) + log(f"Distance-1 exposed agents: {sorted(exposed_agents)}") + + # The triggering decision text (from event_config.initial_posts) + decision_context = get_decision_context(config) + log(f"Decision context: {decision_context[:100]}...") + + model = create_model(config) + + if action_logger: + action_logger.log_simulation_start(config) + + total_actions = 0 + + # Round 0 — log the decision injection (mirrors initial_posts in OASIS) + if action_logger: + action_logger.log_round_start(0, 0) + + event_config = config.get("event_config", {}) + initial_posts = event_config.get("initial_posts", []) + initial_count = 0 + + for post in initial_posts: + poster_id = post.get("poster_agent_id", 0) + content = post.get("content", "") + poster_name = agent_names.get(poster_id, f"Agent_{poster_id}") + + if action_logger: + action_logger.log_action( + round_num=0, + agent_id=poster_id, + agent_name=poster_name, + action_type="CREATE_POST", + action_args={"content": content}, + ) + total_actions += 1 + initial_count += 1 + + if zep_updater and _ZEP_AVAILABLE: + zep_updater.add_activity_from_dict({ + "agent_id": poster_id, + "agent_name": poster_name, + "action_type": "CREATE_POST", + "action_args": {"content": content}, + "round": 0, + "timestamp": datetime.now().isoformat(), + }, platform="private") + + log(f"Decision injected: {initial_count} initial post(s)") + + if action_logger: + action_logger.log_round_end(0, initial_count) + + # Compute total rounds + total_hours = time_config.get("total_simulation_hours", 72) + minutes_per_round = time_config.get("minutes_per_round", 30) + total_rounds = (total_hours * 60) // minutes_per_round + + if max_rounds is not None and max_rounds > 0: + original_rounds = total_rounds + total_rounds = min(total_rounds, max_rounds) + if total_rounds < original_rounds: + log(f"Rounds capped: {original_rounds} → {total_rounds} (max_rounds={max_rounds})") + + # Rolling network activity log for LLM context (last 10 visible actions) + network_log: List[str] = [] + + start_time = datetime.now() + + for round_num in range(total_rounds): + if _shutdown_event and _shutdown_event.is_set(): + log(f"Shutdown signal received, stopping at round {round_num + 1}") + break + + simulated_minutes = round_num * minutes_per_round + simulated_hour = (simulated_minutes // 60) % 24 + simulated_day = simulated_minutes // (60 * 24) + 1 + + active_cfgs = get_active_agents_for_round_private( + agent_configs, exposed_agents, simulated_hour, round_num, time_config + ) + + if action_logger: + action_logger.log_round_start(round_num + 1, simulated_hour) + + if not active_cfgs: + if action_logger: + action_logger.log_round_end(round_num + 1, 0) + continue + + # Build context summary for LLM prompts this round + network_summary = "\n".join(network_log[-10:]) + + # Query all active agents concurrently + tasks = [ + get_agent_action( + cfg, decision_context, network_summary, model, round_num + 1 + ) + for cfg in active_cfgs + ] + action_results = await asyncio.gather(*tasks) + + round_action_count = 0 + newly_exposed: Set[int] = set() + + for cfg, result in zip(active_cfgs, action_results): + agent_id = cfg.get("agent_id", 0) + agent_name = agent_names.get(agent_id, f"Agent_{agent_id}") + action_type = result["action_type"] + action_args = result["action_args"] + + if action_logger: + action_logger.log_action( + round_num=round_num + 1, + agent_id=agent_id, + agent_name=agent_name, + action_type=action_type, + action_args=action_args, + ) + total_actions += 1 + round_action_count += 1 + + if zep_updater and _ZEP_AVAILABLE: + zep_updater.add_activity_from_dict({ + "agent_id": agent_id, + "agent_name": agent_name, + "action_type": action_type, + "action_args": action_args, + "round": round_num + 1, + "timestamp": datetime.now().isoformat(), + }, platform="private") + + # Rolling network log — only visible actions update context + if action_type not in ("DO_NOTHING",): + reasoning = action_args.get("reasoning", "") + entry = f"[Round {round_num + 1}] {agent_name}: {action_type}" + if reasoning: + entry += f" — {reasoning[:80]}" + network_log.append(entry) + + # Propagate exposure via cascade_influence. + # REACT_PRIVATELY is invisible externally — does NOT cascade. + # All other non-DO_NOTHING actions cascade to influenced agents. + if action_type not in ("DO_NOTHING", "REACT_PRIVATELY"): + for influenced_id in relational_graph.get(agent_id, []): + if influenced_id not in exposed_agents: + newly_exposed.add(influenced_id) + influenced_name = agent_names.get(influenced_id, f"Agent_{influenced_id}") + log(f" → {influenced_name} exposed via {agent_name}'s {action_type}") + + # Expand exposed set with newly reached agents + exposed_agents.update(newly_exposed) + + if action_logger: + action_logger.log_round_end(round_num + 1, round_action_count) + + if (round_num + 1) % 20 == 0: + progress = (round_num + 1) / total_rounds * 100 + log( + f"Day {simulated_day}, {simulated_hour:02d}:00 " + f"— Round {round_num + 1}/{total_rounds} ({progress:.1f}%) " + f"| Exposed: {len(exposed_agents)}/{len(agent_configs)}" + ) + + if action_logger: + action_logger.log_simulation_end(total_rounds, total_actions) + + elapsed = (datetime.now() - start_time).total_seconds() + log(f"Simulation complete! Elapsed: {elapsed:.1f}s, Total actions: {total_actions}") + + return total_actions, agent_configs + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +async def main() -> None: + parser = argparse.ArgumentParser(description='Private Impact Simulation') + parser.add_argument( + '--config', + type=str, + required=True, + help='Path to simulation_config.json', + ) + parser.add_argument( + '--max-rounds', + type=int, + default=None, + help='Maximum number of simulation rounds (optional cap)', + ) + parser.add_argument( + '--no-wait', + action='store_true', + default=False, + help='Close environment immediately after simulation (no IPC wait)', + ) + + args = parser.parse_args() + + global _shutdown_event + _shutdown_event = asyncio.Event() + + if not os.path.exists(args.config): + print(f"Error: config file not found: {args.config}") + sys.exit(1) + + config = load_config(args.config) + simulation_dir = os.path.dirname(args.config) or "." + wait_for_commands = not args.no_wait + + # Suppress OASIS loggers (precaution if imported transitively) + for logger_name in ("social.agent", "social.twitter", "social.rec", "oasis.env", "table"): + lg = logging.getLogger(logger_name) + lg.setLevel(logging.CRITICAL) + lg.handlers.clear() + lg.propagate = False + + log_manager = SimulationLogManager(simulation_dir) + private_logger = log_manager.get_private_logger() + + log_manager.info("=" * 60) + log_manager.info("Private Impact Simulation") + log_manager.info(f"Config: {args.config}") + log_manager.info(f"Simulation ID: {config.get('simulation_id', 'unknown')}") + log_manager.info(f"Wait mode: {'enabled' if wait_for_commands else 'disabled'}") + log_manager.info("=" * 60) + + time_config = config.get("time_config", {}) + total_hours = time_config.get("total_simulation_hours", 72) + minutes_per_round = time_config.get("minutes_per_round", 30) + config_total_rounds = (total_hours * 60) // minutes_per_round + + log_manager.info("Simulation parameters:") + log_manager.info(f" - Total simulated duration: {total_hours}h") + log_manager.info(f" - Minutes per round: {minutes_per_round}") + log_manager.info(f" - Config total rounds: {config_total_rounds}") + if args.max_rounds: + log_manager.info(f" - Round cap: {args.max_rounds}") + log_manager.info(f" - Agent count: {len(config.get('agent_configs', []))}") + log_manager.info("Log structure:") + log_manager.info(f" - Main log: simulation.log") + log_manager.info(f" - Private actions: private/actions.jsonl") + log_manager.info("=" * 60) + + start_time = datetime.now() + + total_actions, agent_configs = await run_private_simulation( + config=config, + simulation_dir=simulation_dir, + action_logger=private_logger, + main_logger=log_manager, + max_rounds=args.max_rounds, + ) + + total_elapsed = (datetime.now() - start_time).total_seconds() + log_manager.info("=" * 60) + log_manager.info(f"Simulation loop complete! Elapsed: {total_elapsed:.1f}s") + + if wait_for_commands: + log_manager.info("") + log_manager.info("=" * 60) + log_manager.info("Waiting for commands — environment active") + log_manager.info("Supported: interview, batch_interview, close_env") + log_manager.info("=" * 60) + + model = create_model(config) + ipc_handler = PrivateIPCHandler( + simulation_dir=simulation_dir, + agent_configs=agent_configs, + model=model, + ) + ipc_handler.update_status("alive") + + try: + while not _shutdown_event.is_set(): + should_continue = await ipc_handler.process_commands() + if not should_continue: + break + try: + await asyncio.wait_for(_shutdown_event.wait(), timeout=0.5) + break + except asyncio.TimeoutError: + pass + except KeyboardInterrupt: + print("\nInterrupt received") + except asyncio.CancelledError: + print("\nTask cancelled") + except Exception as e: + print(f"\nCommand processing error: {e}") + + log_manager.info("\nShutting down...") + ipc_handler.update_status("stopped") + + log_manager.info("=" * 60) + log_manager.info("All done!") + log_manager.info(f" - {os.path.join(simulation_dir, 'simulation.log')}") + log_manager.info(f" - {os.path.join(simulation_dir, 'private', 'actions.jsonl')}") + log_manager.info("=" * 60) + + +def setup_signal_handlers() -> None: + """ + Register SIGTERM/SIGINT handlers. + Same pattern as run_parallel_simulation.py — sets _shutdown_event + instead of calling sys.exit() directly to allow graceful cleanup. + """ + def signal_handler(signum: int, frame: Any) -> None: + global _cleanup_done + sig_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT" + print(f"\n{sig_name} received, shutting down...") + + if not _cleanup_done: + _cleanup_done = True + if _shutdown_event: + _shutdown_event.set() + else: + print("Forced exit...") + sys.exit(1) + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + +if __name__ == "__main__": + setup_signal_handlers() + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nProgram interrupted") + except SystemExit: + pass + finally: + try: + from multiprocessing import resource_tracker + resource_tracker._resource_tracker._stop() + except Exception: + pass + print("Simulation process exited") diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 637f1dfa..00000000 --- a/docker-compose.yml +++ /dev/null @@ -1,14 +0,0 @@ -services: - mirofish: - image: ghcr.io/666ghj/mirofish:latest - # 加速镜像(如拉取缓慢可替换上方地址) - # image: ghcr.nju.edu.cn/666ghj/mirofish:latest - container_name: mirofish - env_file: - - .env - ports: - - "3000:3000" - - "5001:5001" - restart: unless-stopped - volumes: - - ./backend/uploads:/app/backend/uploads \ No newline at end of file diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 3e56d752..fdab7ac4 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -1435,7 +1435,6 @@ "resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz", "integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==", "license": "ISC", - "peer": true, "engines": { "node": ">=12" } @@ -1913,7 +1912,6 @@ "integrity": "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -2053,7 +2051,6 @@ "integrity": "sha512-ITcnkFeR3+fI8P1wMgItjGrR10170d8auB4EpMLPqmx6uxElH3a/hHGQabSHKdqd4FXWO1nFIp9rRn7JQ34ACQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.5.0", @@ -2128,7 +2125,6 @@ "resolved": "https://registry.npmjs.org/vue/-/vue-3.5.25.tgz", "integrity": "sha512-YLVdgv2K13WJ6n+kD5owehKtEXwdwXuj2TTyJMsO7pSeKw2bfRNZGjhB7YzrpbMYj5b5QsUebHpOqR3R3ziy/g==", "license": "MIT", - "peer": true, "dependencies": { "@vue/compiler-dom": "3.5.25", "@vue/compiler-sfc": "3.5.25", diff --git a/frontend/src/api/index.js b/frontend/src/api/index.js index e840e116..e2d4b9e0 100644 --- a/frontend/src/api/index.js +++ b/frontend/src/api/index.js @@ -3,7 +3,7 @@ import i18n from '../i18n' // 创建axios实例 const service = axios.create({ - baseURL: import.meta.env.VITE_API_BASE_URL || 'http://localhost:5001', + baseURL: import.meta.env.VITE_API_BASE_URL || 'http://localhost:9902', timeout: 300000, // 5分钟超时(本体生成可能需要较长时间) headers: { 'Content-Type': 'application/json' diff --git a/frontend/src/api/private.js b/frontend/src/api/private.js new file mode 100644 index 00000000..eb6590dc --- /dev/null +++ b/frontend/src/api/private.js @@ -0,0 +1,22 @@ +import service, { requestWithRetry } from './index' + +export const preparePrivateSimulation = (data) => + requestWithRetry(() => service.post('/api/private-impact/prepare', data), 3, 2000) + +export const startPrivateSimulation = (data) => + requestWithRetry(() => service.post('/api/private-impact/start', data), 3, 1000) + +export const getPrivateStatus = (simId) => + service.get(`/api/private-impact/status/${simId}`) + +export const stopPrivateSimulation = (simId) => + service.post(`/api/private-impact/stop/${simId}`) + +export const getPrivateActions = (simId, params = {}) => + service.get(`/api/private-impact/actions/${simId}`, { params }) + +export const generatePrivateReport = (simId, data = {}) => + requestWithRetry(() => service.post(`/api/private-impact/report/${simId}`, data), 3, 1000) + +export const cleanupPrivateSimulation = (simId) => + service.delete(`/api/private-impact/cleanup/${simId}`) diff --git a/frontend/src/components/ModeSelector.vue b/frontend/src/components/ModeSelector.vue new file mode 100644 index 00000000..045e1848 --- /dev/null +++ b/frontend/src/components/ModeSelector.vue @@ -0,0 +1,222 @@ + + + + + diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js index 62d23201..3b1b8b2b 100644 --- a/frontend/src/router/index.js +++ b/frontend/src/router/index.js @@ -5,6 +5,7 @@ import SimulationView from '../views/SimulationView.vue' import SimulationRunView from '../views/SimulationRunView.vue' import ReportView from '../views/ReportView.vue' import InteractionView from '../views/InteractionView.vue' +import PrivateImpactView from '../views/PrivateImpactView.vue' const routes = [ { @@ -41,6 +42,12 @@ const routes = [ name: 'Interaction', component: InteractionView, props: true + }, + { + path: '/private/:projectId', + name: 'PrivateImpact', + component: PrivateImpactView, + props: true } ] diff --git a/frontend/src/views/Home.vue b/frontend/src/views/Home.vue index ca7ef6ff..97b7216e 100644 --- a/frontend/src/views/Home.vue +++ b/frontend/src/views/Home.vue @@ -125,6 +125,9 @@
+
+ +
@@ -216,9 +219,22 @@ import { ref, computed } from 'vue' import { useRouter } from 'vue-router' import HistoryDatabase from '../components/HistoryDatabase.vue' import LanguageSwitcher from '../components/LanguageSwitcher.vue' +import ModeSelector from '../components/ModeSelector.vue' const router = useRouter() +// Mode sélectionné (public | private) +const selectedMode = ref(null) + +const handleModeSelected = (mode) => { + selectedMode.value = mode + if (mode === 'private') { + sessionStorage.setItem('pendingSimMode', 'private') + } else { + sessionStorage.removeItem('pendingSimMode') + } +} + // 表单数据 const formData = ref({ simulationRequirement: '' @@ -672,6 +688,10 @@ const startSimulation = () => { flex: 1.2; } +.mode-selector-wrapper { + margin-bottom: 20px; +} + .console-box { border: 1px solid #CCC; /* 外部实线 */ padding: 8px; /* 内边距形成双重边框感 */ diff --git a/frontend/src/views/MainView.vue b/frontend/src/views/MainView.vue index 513c70d8..d9c9ce5a 100644 --- a/frontend/src/views/MainView.vue +++ b/frontend/src/views/MainView.vue @@ -215,6 +215,13 @@ const handleNewProject = async () => { currentProjectId.value = res.data.project_id projectData.value = res.data + const pendingMode = sessionStorage.getItem('pendingSimMode') + sessionStorage.removeItem('pendingSimMode') + if (pendingMode === 'private') { + router.push(`/private/${res.data.project_id}`) + return + } + router.replace({ name: 'Process', params: { projectId: res.data.project_id } }) ontologyProgress.value = null addLog(`Ontology generated successfully for project ${res.data.project_id}`) diff --git a/frontend/src/views/PrivateImpactView.vue b/frontend/src/views/PrivateImpactView.vue new file mode 100644 index 00000000..00e37df6 --- /dev/null +++ b/frontend/src/views/PrivateImpactView.vue @@ -0,0 +1,1554 @@ + + + + + diff --git a/frontend/vite.config.js b/frontend/vite.config.js index 8f1e4c11..a99ad266 100644 --- a/frontend/vite.config.js +++ b/frontend/vite.config.js @@ -12,11 +12,11 @@ export default defineConfig({ } }, server: { - port: 3000, + port: 9901, open: true, proxy: { '/api': { - target: 'http://localhost:5001', + target: 'http://localhost:9902', changeOrigin: true, secure: false }