diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000..43711aa2 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,86 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +MiroFish is a multi-agent swarm intelligence prediction engine. It builds knowledge graphs from seed data, simulates thousands of AI agents interacting on virtual Twitter/Reddit platforms (via CAMEL-OASIS), and generates analytical reports — all to predict outcomes of real-world scenarios. + +## Commands + +### Setup +```bash +npm run setup:all # Install all dependencies (frontend + backend) +npm run setup # Frontend npm install only +npm run setup:backend # Backend: uv sync (Python deps) +``` + +### Development +```bash +npm run dev # Run backend + frontend concurrently +npm run backend # Backend only: Flask on port 5001 +npm run frontend # Frontend only: Vite on port 3000 +``` + +### Build +```bash +npm run build # Build frontend (Vite) +``` + +### Backend (Python) +```bash +cd backend && uv run python run.py # Start Flask server +cd backend && uv run python -m pytest # Run tests (if any) +``` + +### Docker +```bash +docker-compose up # Full stack via Docker +``` + +## Architecture + +### Stack +- **Backend**: Python ≥3.11 Flask 3.0, managed by `uv` +- **Frontend**: Vue 3 + Vite, port 3000; proxies `/api` → port 5001 +- **LLM**: OpenAI SDK-compatible (default: Qwen via `dashscope`; also works with GLM, OpenAI) +- **Memory/Graph**: Zep Cloud (knowledge graph for entity storage and retrieval) +- **Simulation**: CAMEL-OASIS (multi-agent Twitter + Reddit simulation) +- **Visualization**: D3.js + +### Required Environment Variables +Copy `.env.example` to `.env`: +``` +LLM_API_KEY # Required +LLM_BASE_URL # Default: https://dashscope.aliyuncs.com/compatible-mode/v1 +LLM_MODEL_NAME # Default: qwen-plus +ZEP_API_KEY # Required (Zep Cloud) +``` + +### 5-Step Pipeline +The core workflow is a sequential async pipeline: +1. **Graph Build** — Upload files → LLM extracts ontology → Zep Cloud builds knowledge graph +2. **Env Setup** — Read Zep entities → Generate OASIS agent profiles (AI personalities) +3. **Simulation** — CAMEL-OASIS runs agents on dual platforms (Twitter + Reddit) in parallel +4. **Report** — ReportAgent (ReACT loop) queries graph with tools: `SearchResult`, `InsightForge`, `Panorama`, `Interview` +5. **Interaction** — Chat with simulated agents or the ReportAgent + +### Backend Structure (`backend/app/`) +- `api/` — Flask blueprints: `graph_bp`, `simulation_bp`, `report_bp` +- `services/` — Core logic: graph building, simulation runner, report agent, Zep tools +- `models/` — `Project` and `Task` state objects (in-memory, JSON-serializable) +- `utils/` — LLM client wrapper, file parser, retry logic, Zep pagination +- `config/config.py` — All configuration (LLM, Zep, chunking, simulation params) + +Long-running operations (ontology generation, graph build, profile generation, report generation) run as background tasks tracked via `Task` objects with progress polling. + +### Frontend Structure (`frontend/src/`) +- `views/` — Page components mapped to routes; `Process.vue` is the main 50KB workflow orchestrator +- `components/` — `Step1-5` step components + `GraphPanel.vue` (D3 graph visualization) +- `api/` — Axios services (`graph.js`, `simulation.js`, `report.js`) with 5-min timeout and exponential retry + +### Key Implementation Details +- Reasoning model outputs (e.g., MiniMax/GLM with `` tags or markdown code fences) are stripped before processing — see recent fix in commit `985f89f` +- Simulation state is managed in `SimulationManager`; IPC between processes via `simulation_ipc.py` +- Interview/chat with agents uses prefix injection to suppress tool calls in responses +- Default simulation: max 10 rounds, Twitter actions include CREATE_POST/LIKE/REPOST/FOLLOW/QUOTE/DO_NOTHING; Reddit adds CREATE_COMMENT/DISLIKE diff --git a/README-EN.md b/README-EN.md new file mode 100644 index 00000000..8290ff13 --- /dev/null +++ b/README-EN.md @@ -0,0 +1,227 @@ +
+ +MiroFish Logo + +666ghj%2FMiroFish | Trendshift + +简洁通用的群体智能引擎,预测万物 +
+A Simple and Universal Swarm Intelligence Engine, Predicting Anything + +666ghj%2MiroFish | Shanda + +[![GitHub Stars](https://img.shields.io/github/stars/666ghj/MiroFish?style=flat-square&color=DAA520)](https://github.com/666ghj/MiroFish/stargazers) +[![GitHub Watchers](https://img.shields.io/github/watchers/666ghj/MiroFish?style=flat-square)](https://github.com/666ghj/MiroFish/watchers) +[![GitHub Forks](https://img.shields.io/github/forks/666ghj/MiroFish?style=flat-square)](https://github.com/666ghj/MiroFish/network) +[![Docker](https://img.shields.io/badge/Docker-Build-2496ED?style=flat-square&logo=docker&logoColor=white)](https://hub.docker.com/) +[![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/666ghj/MiroFish) + +[![Discord](https://img.shields.io/badge/Discord-Join-5865F2?style=flat-square&logo=discord&logoColor=white)](http://discord.gg/ePf5aPaHnA) +[![X](https://img.shields.io/badge/X-Follow-000000?style=flat-square&logo=x&logoColor=white)](https://x.com/mirofish_ai) +[![Instagram](https://img.shields.io/badge/Instagram-Follow-E4405F?style=flat-square&logo=instagram&logoColor=white)](https://www.instagram.com/mirofish_ai/) + +[English](./README-EN.md) | [中文文档](./README.md) + +
+ +## ⚡ Overview + +**MiroFish** is a next-generation AI prediction engine powered by multi-agent technology. By extracting seed information from the real world (such as breaking news, policy drafts, or financial signals), it automatically constructs a high-fidelity parallel digital world. Within this space, thousands of intelligent agents with independent personalities, long-term memory, and behavioral logic freely interact and undergo social evolution. You can inject variables dynamically from a "God's-eye view" to precisely deduce future trajectories — **rehearse the future in a digital sandbox, and win decisions after countless simulations**. + +> You only need to: Upload seed materials (data analysis reports or interesting novel stories) and describe your prediction requirements in natural language
+> MiroFish will return: A detailed prediction report and a deeply interactive high-fidelity digital world + +### Our Vision + +MiroFish is dedicated to creating a swarm intelligence mirror that maps reality. By capturing the collective emergence triggered by individual interactions, we break through the limitations of traditional prediction: + +- **At the Macro Level**: We are a rehearsal laboratory for decision-makers, allowing policies and public relations to be tested at zero risk +- **At the Micro Level**: We are a creative sandbox for individual users — whether deducing novel endings or exploring imaginative scenarios, everything can be fun, playful, and accessible + +From serious predictions to playful simulations, we let every "what if" see its outcome, making it possible to predict anything. + +## 🌐 Live Demo + +Welcome to visit our online demo environment and experience a prediction simulation on trending public opinion events we've prepared for you: [mirofish-live-demo](https://666ghj.github.io/mirofish-demo/) + +## 📸 Screenshots + +
+ + + + + + + + + + + + + +
Screenshot 1Screenshot 2
Screenshot 3Screenshot 4
Screenshot 5Screenshot 6
+
+ +## 🎬 Demo Videos + +### 1. Wuhan University Public Opinion Simulation + MiroFish Project Introduction + +
+MiroFish Demo Video + +Click the image to watch the complete demo video for prediction using BettaFish-generated "Wuhan University Public Opinion Report" +
+ +### 2. Dream of the Red Chamber Lost Ending Simulation + +
+MiroFish Demo Video + +Click the image to watch MiroFish's deep prediction of the lost ending based on hundreds of thousands of words from the first 80 chapters of "Dream of the Red Chamber" +
+ +> **Financial Prediction**, **Political News Prediction** and more examples coming soon... + +## 🔄 Workflow + +1. **Graph Building**: Seed extraction & Individual/collective memory injection & GraphRAG construction +2. **Environment Setup**: Entity relationship extraction & Persona generation & Agent configuration injection +3. **Simulation**: Dual-platform parallel simulation & Auto-parse prediction requirements & Dynamic temporal memory updates +4. **Report Generation**: ReportAgent with rich toolset for deep interaction with post-simulation environment +5. **Deep Interaction**: Chat with any agent in the simulated world & Interact with ReportAgent + +## 🚀 Quick Start + +### Option 1: Source Code Deployment (Recommended) + +#### Prerequisites + +| Tool | Version | Description | Check Installation | +|------|---------|-------------|-------------------| +| **Node.js** | 18+ | Frontend runtime, includes npm | `node -v` | +| **Python** | ≥3.11, ≤3.12 | Backend runtime | `python --version` | +| **uv** | Latest | Python package manager | `uv --version` | +| **Neo4j** | 5.x Community | Local knowledge graph database | `neo4j --version` | + +**Install Neo4j (choose one):** + +```bash +# macOS +brew install neo4j + +# Linux (Debian/Ubuntu) +# See official docs: https://neo4j.com/docs/operations-manual/current/installation/linux/ + +# Windows / All platforms +# Download Neo4j Desktop: https://neo4j.com/download/ + +# Set password before first start, then launch +neo4j-admin dbms set-initial-password your_neo4j_password +neo4j start +``` + +#### 1. Configure Environment Variables + +```bash +# Copy the example configuration file +cp .env.example .env + +# Edit the .env file and fill in the required API keys +``` + +**Required Environment Variables:** + +```env +# LLM API Configuration (supports any LLM API with OpenAI SDK format) +# Recommended: Alibaba Qwen-plus model via Bailian Platform: https://bailian.console.aliyun.com/ +# High consumption, try simulations with fewer than 40 rounds first +LLM_API_KEY=your_api_key +LLM_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1 +LLM_MODEL_NAME=qwen-plus + +# Knowledge Graph — local Neo4j + Graphiti (free, no rate limits) +NEO4J_URI=bolt://localhost:7687 +NEO4J_USER=neo4j +NEO4J_PASSWORD=your_neo4j_password + +# Embedding model (uncomment if using Gemini API) +# EMBEDDING_MODEL=gemini-embedding-001 +``` + +> **Note:** MiroFish has migrated from Zep Cloud to local **Graphiti + Neo4j**. No third-party account required — completely free with no rate limits. + +#### 2. Install Dependencies + +```bash +# One-click installation of all dependencies (root + frontend + backend) +npm run setup:all +``` + +Or install step by step: + +```bash +# Install Node dependencies (root + frontend) +npm run setup + +# Install Python dependencies (backend, auto-creates virtual environment) +npm run setup:backend +``` + +#### 3. Start Services + +```bash +# Start both frontend and backend (run from project root) +npm run dev +``` + +**Service URLs:** +- Frontend: `http://localhost:3000` +- Backend API: `http://localhost:5001` + +**Start Individually:** + +```bash +npm run backend # Start backend only +npm run frontend # Start frontend only +``` + +### Option 2: Docker Deployment + +```bash +# 1. Configure environment variables (same as source deployment) +cp .env.example .env + +# 2. Pull image and start +docker compose up -d +``` + +Reads `.env` from root directory by default, maps ports `3000 (frontend) / 5001 (backend)` + +> Mirror address for faster pulling is provided as comments in `docker-compose.yml`, replace if needed. + +## 📬 Join the Conversation + +
+QQ Group +
+ +  + +The MiroFish team is recruiting full-time/internship positions. If you're interested in multi-agent simulation and LLM applications, feel free to send your resume to: **mirofish@shanda.com** + +## 📄 Acknowledgments + +**MiroFish has received strategic support and incubation from Shanda Group!** + +MiroFish's simulation engine is powered by **[OASIS (Open Agent Social Interaction Simulations)](https://github.com/camel-ai/oasis)**, We sincerely thank the CAMEL-AI team for their open-source contributions! + +## 📈 Project Statistics + + + + + + Star History Chart + + \ No newline at end of file diff --git a/README.md b/README.md index 4b8369f4..f98741d6 100644 --- a/README.md +++ b/README.md @@ -97,11 +97,37 @@ Click the image to watch MiroFish's deep prediction of the lost ending based on #### Prerequisites +<<<<<<< HEAD | Tool | Version | Description | Check Installation | |------|---------|-------------|-------------------| | **Node.js** | 18+ | Frontend runtime, includes npm | `node -v` | | **Python** | ≥3.11, ≤3.12 | Backend runtime | `python --version` | | **uv** | Latest | Python package manager | `uv --version` | +======= +| 工具 | 版本要求 | 说明 | 安装检查 | +|------|---------|------|---------| +| **Node.js** | 18+ | 前端运行环境,包含 npm | `node -v` | +| **Python** | ≥3.11, ≤3.12 | 后端运行环境 | `python --version` | +| **uv** | 最新版 | Python 包管理器 | `uv --version` | +| **Neo4j** | 5.x Community | 本地知识图谱数据库 | `neo4j --version` | + +**安装 Neo4j(选择适合你的方式):** + +```bash +# macOS +brew install neo4j + +# Linux (Debian/Ubuntu) +# 参考官方文档:https://neo4j.com/docs/operations-manual/current/installation/linux/ + +# Windows / 所有平台 +# 下载 Desktop 版本:https://neo4j.com/download/ + +# 首次启动前设置密码,然后启动服务 +neo4j-admin dbms set-initial-password your_neo4j_password +neo4j start +``` +>>>>>>> abhiyadav2345/feat/graphiti-neo4j-migration #### 1. Configure Environment Variables @@ -122,12 +148,29 @@ LLM_API_KEY=your_api_key LLM_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1 LLM_MODEL_NAME=qwen-plus +<<<<<<< HEAD # Zep Cloud Configuration # Free monthly quota is sufficient for simple usage: https://app.getzep.com/ ZEP_API_KEY=your_zep_api_key ``` #### 2. Install Dependencies +======= +# 知识图谱配置(本地 Neo4j + Graphiti,免费无限制) +# 安装 Neo4j Community Edition:https://neo4j.com/download/ +# macOS 用户:brew install neo4j && neo4j start +NEO4J_URI=bolt://localhost:7687 +NEO4J_USER=neo4j +NEO4J_PASSWORD=your_neo4j_password + +# Embedding 模型(使用 Gemini API 时取消注释) +# EMBEDDING_MODEL=gemini-embedding-001 +``` + +> **注意:** MiroFish 已从 Zep Cloud 迁移至本地 **Graphiti + Neo4j**,无需注册任何第三方服务,完全免费且无速率限制。 + +#### 2. 安装依赖 +>>>>>>> abhiyadav2345/feat/graphiti-neo4j-migration ```bash # One-click installation of all dependencies (root + frontend + backend) diff --git a/backend/app/__init__.py b/backend/app/__init__.py index aba624bb..6f3345cd 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -73,8 +73,36 @@ def create_app(config_class=Config): def health(): return {'status': 'ok', 'service': 'MiroFish Backend'} + # On startup: recover any projects stuck in graph_building (task was killed by restart) + if should_log_startup: + _recover_stuck_projects() + if should_log_startup: logger.info("MiroFish Backend 启动完成") - + return app + +def _recover_stuck_projects(): + """Mark graph_building projects as completed if Neo4j already has their data.""" + from .models.project import ProjectManager, ProjectStatus + from .utils.logger import get_logger as _get_logger + _log = _get_logger('mirofish.startup') + try: + for p in ProjectManager.list_projects(): + if p.status == ProjectStatus.GRAPH_BUILDING and p.graph_id: + from .services.graphiti_adapter import _get_graphiti, _run, _neo4j_query + g = _get_graphiti() + r = _run(_neo4j_query(g, + 'MATCH (n:Entity {group_id: $gid}) RETURN count(n) AS n', + {'gid': p.graph_id} + )) + node_count = int(r[0]['n']) if r else 0 + if node_count > 0: + p.status = ProjectStatus.GRAPH_COMPLETED + p.graph_build_task_id = None + ProjectManager.save_project(p) + _log.info(f"Recovered stuck project {p.project_id}: {node_count} nodes found, marked graph_completed") + except Exception as e: + _get_logger('mirofish.startup').warning(f"Startup recovery failed: {e}") + diff --git a/backend/app/api/graph.py b/backend/app/api/graph.py index 759ff48b..f432269d 100644 --- a/backend/app/api/graph.py +++ b/backend/app/api/graph.py @@ -4,6 +4,7 @@ """ import os +import time import traceback import threading from flask import request, jsonify @@ -15,10 +16,15 @@ from ..services.graph_builder import GraphBuilderService from ..services.text_processor import TextProcessor from ..utils.file_parser import FileParser from ..utils.logger import get_logger -from ..utils.locale import t, get_locale, set_locale from ..models.task import TaskManager, TaskStatus from ..models.project import ProjectManager, ProjectStatus +# In-memory cache for graph data to avoid hammering Zep's rate-limited API. +# Stale cache is served instantly on 429; a background thread refreshes it. +_graph_data_cache: dict = {} # graph_id -> {"data": ..., "ts": float} +_graph_refresh_locks: dict = {} # graph_id -> threading.Lock (one refresh at a time) +_GRAPH_CACHE_TTL = 300 # seconds before triggering a background refresh + # 获取日志器 logger = get_logger('mirofish.api') @@ -43,9 +49,9 @@ def get_project(project_id: str): if not project: return jsonify({ "success": False, - "error": t('api.projectNotFound', id=project_id) + "error": f"项目不存在: {project_id}" }), 404 - + return jsonify({ "success": True, "data": project.to_dict() @@ -77,12 +83,12 @@ def delete_project(project_id: str): if not success: return jsonify({ "success": False, - "error": t('api.projectDeleteFailed', id=project_id) + "error": f"项目不存在或删除失败: {project_id}" }), 404 - + return jsonify({ "success": True, - "message": t('api.projectDeleted', id=project_id) + "message": f"项目已删除: {project_id}" }) @@ -96,9 +102,9 @@ def reset_project(project_id: str): if not project: return jsonify({ "success": False, - "error": t('api.projectNotFound', id=project_id) + "error": f"项目不存在: {project_id}" }), 404 - + # 重置到本体已生成状态 if project.ontology: project.status = ProjectStatus.ONTOLOGY_GENERATED @@ -112,7 +118,7 @@ def reset_project(project_id: str): return jsonify({ "success": True, - "message": t('api.projectReset', id=project_id), + "message": f"项目已重置: {project_id}", "data": project.to_dict() }) @@ -161,7 +167,7 @@ def generate_ontology(): if not simulation_requirement: return jsonify({ "success": False, - "error": t('api.requireSimulationRequirement') + "error": "请提供模拟需求描述 (simulation_requirement)" }), 400 # 获取上传的文件 @@ -169,7 +175,7 @@ def generate_ontology(): if not uploaded_files or all(not f.filename for f in uploaded_files): return jsonify({ "success": False, - "error": t('api.requireFileUpload') + "error": "请至少上传一个文档文件" }), 400 # 创建项目 @@ -204,7 +210,7 @@ def generate_ontology(): ProjectManager.delete_project(project.project_id) return jsonify({ "success": False, - "error": t('api.noDocProcessed') + "error": "没有成功处理任何文档,请检查文件格式" }), 400 # 保存提取的文本 @@ -285,13 +291,13 @@ def build_graph(): # 检查配置 errors = [] - if not Config.ZEP_API_KEY: - errors.append(t('api.zepApiKeyMissing')) + if not Config.NEO4J_PASSWORD: + errors.append("NEO4J未配置") if errors: logger.error(f"配置错误: {errors}") return jsonify({ "success": False, - "error": t('api.configError', details="; ".join(errors)) + "error": "配置错误: " + "; ".join(errors) }), 500 # 解析请求 @@ -302,7 +308,7 @@ def build_graph(): if not project_id: return jsonify({ "success": False, - "error": t('api.requireProjectId') + "error": "请提供 project_id" }), 400 # 获取项目 @@ -310,22 +316,22 @@ def build_graph(): if not project: return jsonify({ "success": False, - "error": t('api.projectNotFound', id=project_id) + "error": f"项目不存在: {project_id}" }), 404 - + # 检查项目状态 force = data.get('force', False) # 强制重新构建 if project.status == ProjectStatus.CREATED: return jsonify({ "success": False, - "error": t('api.ontologyNotGenerated') + "error": "项目尚未生成本体,请先调用 /ontology/generate" }), 400 if project.status == ProjectStatus.GRAPH_BUILDING and not force: return jsonify({ "success": False, - "error": t('api.graphBuilding'), + "error": "图谱正在构建中,请勿重复提交。如需强制重建,请添加 force: true", "task_id": project.graph_build_task_id }), 400 @@ -350,7 +356,7 @@ def build_graph(): if not text: return jsonify({ "success": False, - "error": t('api.textNotFound') + "error": "未找到提取的文本内容" }), 400 # 获取本体 @@ -358,7 +364,7 @@ def build_graph(): if not ontology: return jsonify({ "success": False, - "error": t('api.ontologyNotFound') + "error": "未找到本体定义" }), 400 # 创建异步任务 @@ -371,28 +377,24 @@ def build_graph(): project.graph_build_task_id = task_id ProjectManager.save_project(project) - # Capture locale before spawning background thread - current_locale = get_locale() - # 启动后台任务 def build_task(): - set_locale(current_locale) build_logger = get_logger('mirofish.build') try: build_logger.info(f"[{task_id}] 开始构建图谱...") task_manager.update_task( task_id, status=TaskStatus.PROCESSING, - message=t('progress.initGraphService') + message="初始化图谱构建服务..." ) # 创建图谱构建服务 - builder = GraphBuilderService(api_key=Config.ZEP_API_KEY) + builder = GraphBuilderService() # 分块 task_manager.update_task( task_id, - message=t('progress.textChunking'), + message="文本分块中...", progress=5 ) chunks = TextProcessor.split_text( @@ -405,7 +407,7 @@ def build_graph(): # 创建图谱 task_manager.update_task( task_id, - message=t('progress.creatingZepGraph'), + message="创建Zep图谱...", progress=10 ) graph_id = builder.create_graph(name=graph_name) @@ -417,7 +419,7 @@ def build_graph(): # 设置本体 task_manager.update_task( task_id, - message=t('progress.settingOntology'), + message="设置本体定义...", progress=15 ) builder.set_ontology(graph_id, ontology) @@ -431,23 +433,36 @@ def build_graph(): progress=progress ) - task_manager.update_task( - task_id, - message=t('progress.addingChunks', count=total_chunks), - progress=15 - ) - + # Count already-processed episodes to resume after a restart + from app.services.graphiti_adapter import _get_graphiti, _run, _neo4j_query + try: + g = _get_graphiti() + ep_count = _run(_neo4j_query(g, + 'MATCH (e:Episodic {group_id: $gid}) RETURN count(e) AS n', + {'gid': graph_id} + )) + already_done = int(ep_count[0]['n']) if ep_count else 0 + except Exception: + already_done = 0 + + skip_chunks = already_done + remaining = total_chunks - skip_chunks + msg_start = (f"断点续传:跳过 {skip_chunks} 个已处理块,继续处理 {remaining} 块..." + if skip_chunks > 0 else f"开始添加 {total_chunks} 个文本块...") + task_manager.update_task(task_id, message=msg_start, progress=15) + episode_uuids = builder.add_text_batches( - graph_id, + graph_id, chunks, batch_size=3, - progress_callback=add_progress_callback + progress_callback=add_progress_callback, + skip_chunks=skip_chunks, ) # 等待Zep处理完成(查询每个episode的processed状态) task_manager.update_task( task_id, - message=t('progress.waitingZepProcess'), + message="等待Zep处理数据...", progress=55 ) @@ -464,7 +479,7 @@ def build_graph(): # 获取图谱数据 task_manager.update_task( task_id, - message=t('progress.fetchingGraphData'), + message="获取图谱数据...", progress=95 ) graph_data = builder.get_graph_data(graph_id) @@ -481,7 +496,7 @@ def build_graph(): task_manager.update_task( task_id, status=TaskStatus.COMPLETED, - message=t('progress.graphBuildComplete'), + message="图谱构建完成", progress=100, result={ "project_id": project_id, @@ -504,7 +519,7 @@ def build_graph(): task_manager.update_task( task_id, status=TaskStatus.FAILED, - message=t('progress.buildFailed', error=str(e)), + message=f"构建失败: {str(e)}", error=traceback.format_exc() ) @@ -517,7 +532,7 @@ def build_graph(): "data": { "project_id": project_id, "task_id": task_id, - "message": t('api.graphBuildStarted', taskId=task_id) + "message": "图谱构建任务已启动,请通过 /task/{task_id} 查询进度" } }) @@ -541,7 +556,7 @@ def get_task(task_id: str): if not task: return jsonify({ "success": False, - "error": t('api.taskNotFound', id=task_id) + "error": f"任务不存在: {task_id}" }), 404 return jsonify({ @@ -566,32 +581,59 @@ def list_tasks(): # ============== 图谱数据接口 ============== +def _refresh_graph_cache(graph_id: str): + """Background thread: fetch graph data from Neo4j and update cache.""" + lock = _graph_refresh_locks.setdefault(graph_id, threading.Lock()) + if not lock.acquire(blocking=False): + return # another refresh already in progress + try: + # Look up ontology from the project that owns this graph_id + ontology = None + for project in ProjectManager.list_projects(): + if project.graph_id == graph_id and project.ontology: + ontology = project.ontology + break + + builder = GraphBuilderService() + graph_data = builder.get_graph_data(graph_id, ontology=ontology) + _graph_data_cache[graph_id] = {"data": graph_data, "ts": time.time()} + logger.info(f"Graph cache refreshed for {graph_id}") + except Exception as e: + logger.warning(f"Background graph cache refresh failed for {graph_id}: {str(e)[:100]}") + finally: + lock.release() + + @graph_bp.route('/data/', methods=['GET']) def get_graph_data(graph_id: str): """ - 获取图谱数据(节点和边) + 获取图谱数据(节点和边)。 + - 有缓存且未过期:直接返回缓存,不调用 Zep + - 有缓存但已过期:立即返回旧缓存,后台异步刷新 + - 无缓存:后台线程拉取,返回 202 让前端稍后重试 """ - try: - if not Config.ZEP_API_KEY: - return jsonify({ - "success": False, - "error": t('api.zepApiKeyMissing') - }), 500 - - builder = GraphBuilderService(api_key=Config.ZEP_API_KEY) - graph_data = builder.get_graph_data(graph_id) - - return jsonify({ - "success": True, - "data": graph_data - }) - - except Exception as e: - return jsonify({ - "success": False, - "error": str(e), - "traceback": traceback.format_exc() - }), 500 + if not Config.NEO4J_PASSWORD: + return jsonify({"success": False, "error": "NEO4J未配置"}), 500 + + cached = _graph_data_cache.get(graph_id) + age = time.time() - cached["ts"] if cached else None + + if cached and age < _GRAPH_CACHE_TTL: + # Fresh cache — return immediately + return jsonify({"success": True, "data": cached["data"], "cached": True}) + + if cached: + # Stale cache — serve it immediately, refresh in background + threading.Thread(target=_refresh_graph_cache, args=(graph_id,), daemon=True).start() + return jsonify({"success": True, "data": cached["data"], "cached": True, "stale": True}) + + # No cache at all — kick off background fetch, tell frontend to retry + threading.Thread(target=_refresh_graph_cache, args=(graph_id,), daemon=True).start() + return jsonify({ + "success": False, + "error": "Graph data is loading, please retry in a moment.", + "retry": True + }), 202 @graph_bp.route('/delete/', methods=['DELETE']) @@ -600,18 +642,18 @@ def delete_graph(graph_id: str): 删除Zep图谱 """ try: - if not Config.ZEP_API_KEY: + if not Config.NEO4J_PASSWORD: return jsonify({ "success": False, - "error": t('api.zepApiKeyMissing') + "error": "NEO4J未配置" }), 500 - builder = GraphBuilderService(api_key=Config.ZEP_API_KEY) + builder = GraphBuilderService() builder.delete_graph(graph_id) return jsonify({ "success": True, - "message": t('api.graphDeleted', id=graph_id) + "message": f"图谱已删除: {graph_id}" }) except Exception as e: diff --git a/backend/app/api/simulation.py b/backend/app/api/simulation.py index 3a8e1e3f..77acc1a9 100644 --- a/backend/app/api/simulation.py +++ b/backend/app/api/simulation.py @@ -14,7 +14,6 @@ from ..services.oasis_profile_generator import OasisProfileGenerator from ..services.simulation_manager import SimulationManager, SimulationStatus from ..services.simulation_runner import SimulationRunner, RunnerStatus from ..utils.logger import get_logger -from ..utils.locale import t, get_locale, set_locale from ..models.project import ProjectManager logger = get_logger('mirofish.api.simulation') @@ -57,10 +56,10 @@ def get_graph_entities(graph_id: str): enrich: 是否获取相关边信息(默认true) """ try: - if not Config.ZEP_API_KEY: + if not Config.NEO4J_PASSWORD: return jsonify({ "success": False, - "error": t('api.zepApiKeyMissing') + "error": "NEO4J未配置" }), 500 entity_types_str = request.args.get('entity_types', '') @@ -94,10 +93,10 @@ def get_graph_entities(graph_id: str): def get_entity_detail(graph_id: str, entity_uuid: str): """获取单个实体的详细信息""" try: - if not Config.ZEP_API_KEY: + if not Config.NEO4J_PASSWORD: return jsonify({ "success": False, - "error": t('api.zepApiKeyMissing') + "error": "NEO4J未配置" }), 500 reader = ZepEntityReader() @@ -106,7 +105,7 @@ def get_entity_detail(graph_id: str, entity_uuid: str): if not entity: return jsonify({ "success": False, - "error": t('api.entityNotFound', id=entity_uuid) + "error": f"实体不存在: {entity_uuid}" }), 404 return jsonify({ @@ -127,10 +126,10 @@ def get_entity_detail(graph_id: str, entity_uuid: str): def get_entities_by_type(graph_id: str, entity_type: str): """获取指定类型的所有实体""" try: - if not Config.ZEP_API_KEY: + if not Config.NEO4J_PASSWORD: return jsonify({ "success": False, - "error": t('api.zepApiKeyMissing') + "error": "NEO4J未配置" }), 500 enrich = request.args.get('enrich', 'true').lower() == 'true' @@ -198,21 +197,21 @@ def create_simulation(): if not project_id: return jsonify({ "success": False, - "error": t('api.requireProjectId') + "error": "请提供 project_id" }), 400 project = ProjectManager.get_project(project_id) if not project: return jsonify({ "success": False, - "error": t('api.projectNotFound', id=project_id) + "error": f"项目不存在: {project_id}" }), 404 graph_id = data.get('graph_id') or project.graph_id if not graph_id: return jsonify({ "success": False, - "error": t('api.graphNotBuilt') + "error": "项目尚未构建图谱,请先调用 /api/graph/build" }), 400 manager = SimulationManager() @@ -409,7 +408,7 @@ def prepare_simulation(): if not simulation_id: return jsonify({ "success": False, - "error": t('api.requireSimulationId') + "error": "请提供 simulation_id" }), 400 manager = SimulationManager() @@ -418,7 +417,7 @@ def prepare_simulation(): if not state: return jsonify({ "success": False, - "error": t('api.simulationNotFound', id=simulation_id) + "error": f"模拟不存在: {simulation_id}" }), 404 # 检查是否强制重新生成 @@ -437,7 +436,7 @@ def prepare_simulation(): "data": { "simulation_id": simulation_id, "status": "ready", - "message": t('api.alreadyPrepared'), + "message": "已有完成的准备工作,无需重复生成", "already_prepared": True, "prepare_info": prepare_info } @@ -450,7 +449,7 @@ def prepare_simulation(): if not project: return jsonify({ "success": False, - "error": t('api.projectNotFound', id=state.project_id) + "error": f"项目不存在: {state.project_id}" }), 404 # 获取模拟需求 @@ -458,7 +457,7 @@ def prepare_simulation(): if not simulation_requirement: return jsonify({ "success": False, - "error": t('api.projectMissingRequirement') + "error": "项目缺少模拟需求描述 (simulation_requirement)" }), 400 # 获取文档文本 @@ -501,18 +500,14 @@ def prepare_simulation(): state.status = SimulationStatus.PREPARING manager._save_simulation_state(state) - # Capture locale before spawning background thread - current_locale = get_locale() - # 定义后台任务 def run_prepare(): - set_locale(current_locale) try: task_manager.update_task( task_id, status=TaskStatus.PROCESSING, progress=0, - message=t('progress.startPreparingEnv') + message="开始准备模拟环境..." ) # 准备模拟(带进度回调) @@ -533,10 +528,10 @@ def prepare_simulation(): # 构建详细进度信息 stage_names = { - "reading": t('progress.readingGraphEntities'), - "generating_profiles": t('progress.generatingProfiles'), - "generating_config": t('progress.generatingSimConfig'), - "copying_scripts": t('progress.preparingScripts') + "reading": "读取图谱实体", + "generating_profiles": "生成Agent人设", + "generating_config": "生成模拟配置", + "copying_scripts": "准备模拟脚本" } stage_index = list(stage_weights.keys()).index(stage) + 1 if stage in stage_weights else 1 @@ -617,7 +612,7 @@ def prepare_simulation(): "simulation_id": simulation_id, "task_id": task_id, "status": "preparing", - "message": t('api.prepareStarted'), + "message": "准备任务已启动,请通过 /api/simulation/prepare/status 查询进度", "already_prepared": False, "expected_entities_count": state.entities_count, # 预期的Agent总数 "entity_types": state.entity_types # 实体类型列表 @@ -685,7 +680,7 @@ def get_prepare_status(): "simulation_id": simulation_id, "status": "ready", "progress": 100, - "message": t('api.alreadyPrepared'), + "message": "已有完成的准备工作", "already_prepared": True, "prepare_info": prepare_info } @@ -701,13 +696,13 @@ def get_prepare_status(): "simulation_id": simulation_id, "status": "not_started", "progress": 0, - "message": t('api.notStartedPrepare'), + "message": "尚未开始准备,请调用 /api/simulation/prepare 开始", "already_prepared": False } }) return jsonify({ "success": False, - "error": t('api.requireTaskOrSimId') + "error": "请提供 task_id 或 simulation_id" }), 400 task_manager = TaskManager() @@ -725,7 +720,7 @@ def get_prepare_status(): "task_id": task_id, "status": "ready", "progress": 100, - "message": t('api.taskCompletedPrepared'), + "message": "任务已完成(准备工作已存在)", "already_prepared": True, "prepare_info": prepare_info } @@ -733,7 +728,7 @@ def get_prepare_status(): return jsonify({ "success": False, - "error": t('api.taskNotFound', id=task_id) + "error": f"任务不存在: {task_id}" }), 404 task_dict = task.to_dict() @@ -762,7 +757,7 @@ def get_simulation(simulation_id: str): if not state: return jsonify({ "success": False, - "error": t('api.simulationNotFound', id=simulation_id) + "error": f"模拟不存在: {simulation_id}" }), 404 result = state.to_dict() @@ -1066,7 +1061,7 @@ def get_simulation_profiles_realtime(simulation_id: str): if not os.path.exists(sim_dir): return jsonify({ "success": False, - "error": t('api.simulationNotFound', id=simulation_id) + "error": f"模拟不存在: {simulation_id}" }), 404 # 确定文件路径 @@ -1169,7 +1164,7 @@ def get_simulation_config_realtime(simulation_id: str): if not os.path.exists(sim_dir): return jsonify({ "success": False, - "error": t('api.simulationNotFound', id=simulation_id) + "error": f"模拟不存在: {simulation_id}" }), 404 # 配置文件路径 @@ -1274,7 +1269,7 @@ def get_simulation_config(simulation_id: str): if not config: return jsonify({ "success": False, - "error": t('api.configNotFound') + "error": f"模拟配置不存在,请先调用 /prepare 接口" }), 404 return jsonify({ @@ -1302,7 +1297,7 @@ def download_simulation_config(simulation_id: str): if not os.path.exists(config_path): return jsonify({ "success": False, - "error": t('api.configFileNotFound') + "error": "配置文件不存在,请先调用 /prepare 接口" }), 404 return send_file( @@ -1346,7 +1341,7 @@ def download_simulation_script(script_name: str): if script_name not in allowed_scripts: return jsonify({ "success": False, - "error": t('api.unknownScript', name=script_name, allowed=allowed_scripts) + "error": f"未知脚本: {script_name},可选: {allowed_scripts}" }), 400 script_path = os.path.join(scripts_dir, script_name) @@ -1354,7 +1349,7 @@ def download_simulation_script(script_name: str): if not os.path.exists(script_path): return jsonify({ "success": False, - "error": t('api.scriptFileNotFound', name=script_name) + "error": f"脚本文件不存在: {script_name}" }), 404 return send_file( @@ -1394,7 +1389,7 @@ def generate_profiles(): if not graph_id: return jsonify({ "success": False, - "error": t('api.requireGraphId') + "error": "请提供 graph_id" }), 400 entity_types = data.get('entity_types') @@ -1411,7 +1406,7 @@ def generate_profiles(): if filtered.filtered_count == 0: return jsonify({ "success": False, - "error": t('api.noMatchingEntities') + "error": "没有找到符合条件的实体" }), 400 generator = OasisProfileGenerator() @@ -1496,7 +1491,7 @@ def start_simulation(): if not simulation_id: return jsonify({ "success": False, - "error": t('api.requireSimulationId') + "error": "请提供 simulation_id" }), 400 platform = data.get('platform', 'parallel') @@ -1511,18 +1506,18 @@ def start_simulation(): if max_rounds <= 0: return jsonify({ "success": False, - "error": t('api.maxRoundsPositive') + "error": "max_rounds 必须是正整数" }), 400 except (ValueError, TypeError): return jsonify({ "success": False, - "error": t('api.maxRoundsInvalid') + "error": "max_rounds 必须是有效的整数" }), 400 if platform not in ['twitter', 'reddit', 'parallel']: return jsonify({ "success": False, - "error": t('api.invalidPlatform', platform=platform) + "error": f"无效的平台类型: {platform},可选: twitter/reddit/parallel" }), 400 # 检查模拟是否已准备好 @@ -1532,7 +1527,7 @@ def start_simulation(): if not state: return jsonify({ "success": False, - "error": t('api.simulationNotFound', id=simulation_id) + "error": f"模拟不存在: {simulation_id}" }), 404 force_restarted = False @@ -1559,7 +1554,7 @@ def start_simulation(): else: return jsonify({ "success": False, - "error": t('api.simRunningForceHint') + "error": f"模拟正在运行中,请先调用 /stop 接口停止,或使用 force=true 强制重新开始" }), 400 # 如果是强制模式,清理运行日志 @@ -1578,7 +1573,7 @@ def start_simulation(): # 准备工作未完成 return jsonify({ "success": False, - "error": t('api.simNotReady', status=state.status.value) + "error": f"模拟未准备好,当前状态: {state.status.value},请先调用 /prepare 接口" }), 400 # 获取图谱ID(用于图谱记忆更新) @@ -1595,7 +1590,7 @@ def start_simulation(): if not graph_id: return jsonify({ "success": False, - "error": t('api.graphIdRequiredForMemory') + "error": "启用图谱记忆更新需要有效的 graph_id,请确保项目已构建图谱" }), 400 logger.info(f"启用图谱记忆更新: simulation_id={simulation_id}, graph_id={graph_id}") @@ -1668,7 +1663,7 @@ def stop_simulation(): if not simulation_id: return jsonify({ "success": False, - "error": t('api.requireSimulationId') + "error": "请提供 simulation_id" }), 400 run_state = SimulationRunner.stop_simulation(simulation_id) @@ -2016,7 +2011,7 @@ def get_simulation_posts(simulation_id: str): "platform": platform, "count": 0, "posts": [], - "message": t('api.dbNotExist') + "message": "数据库不存在,模拟可能尚未运行" } }) @@ -2202,33 +2197,33 @@ def interview_agent(): if not simulation_id: return jsonify({ "success": False, - "error": t('api.requireSimulationId') + "error": "请提供 simulation_id" }), 400 if agent_id is None: return jsonify({ "success": False, - "error": t('api.requireAgentId') + "error": "请提供 agent_id" }), 400 if not prompt: return jsonify({ "success": False, - "error": t('api.requirePrompt') + "error": "请提供 prompt(采访问题)" }), 400 # 验证platform参数 if platform and platform not in ("twitter", "reddit"): return jsonify({ "success": False, - "error": t('api.invalidInterviewPlatform') + "error": "platform 参数只能是 'twitter' 或 'reddit'" }), 400 # 检查环境状态 if not SimulationRunner.check_env_alive(simulation_id): return jsonify({ "success": False, - "error": t('api.envNotRunning') + "error": "模拟环境未运行或已关闭。请确保模拟已完成并进入等待命令模式。" }), 400 # 优化prompt,添加前缀避免Agent调用工具 @@ -2256,7 +2251,7 @@ def interview_agent(): except TimeoutError as e: return jsonify({ "success": False, - "error": t('api.interviewTimeout', error=str(e)) + "error": f"等待Interview响应超时: {str(e)}" }), 504 except Exception as e: @@ -2323,20 +2318,20 @@ def interview_agents_batch(): if not simulation_id: return jsonify({ "success": False, - "error": t('api.requireSimulationId') + "error": "请提供 simulation_id" }), 400 if not interviews or not isinstance(interviews, list): return jsonify({ "success": False, - "error": t('api.requireInterviews') + "error": "请提供 interviews(采访列表)" }), 400 # 验证platform参数 if platform and platform not in ("twitter", "reddit"): return jsonify({ "success": False, - "error": t('api.invalidInterviewPlatform') + "error": "platform 参数只能是 'twitter' 或 'reddit'" }), 400 # 验证每个采访项 @@ -2344,26 +2339,26 @@ def interview_agents_batch(): if 'agent_id' not in interview: return jsonify({ "success": False, - "error": t('api.interviewListMissingAgentId', index=i+1) + "error": f"采访列表第{i+1}项缺少 agent_id" }), 400 if 'prompt' not in interview: return jsonify({ "success": False, - "error": t('api.interviewListMissingPrompt', index=i+1) + "error": f"采访列表第{i+1}项缺少 prompt" }), 400 # 验证每项的platform(如果有) item_platform = interview.get('platform') if item_platform and item_platform not in ("twitter", "reddit"): return jsonify({ "success": False, - "error": t('api.interviewListInvalidPlatform', index=i+1) + "error": f"采访列表第{i+1}项的platform只能是 'twitter' 或 'reddit'" }), 400 # 检查环境状态 if not SimulationRunner.check_env_alive(simulation_id): return jsonify({ "success": False, - "error": t('api.envNotRunning') + "error": "模拟环境未运行或已关闭。请确保模拟已完成并进入等待命令模式。" }), 400 # 优化每个采访项的prompt,添加前缀避免Agent调用工具 @@ -2394,7 +2389,7 @@ def interview_agents_batch(): except TimeoutError as e: return jsonify({ "success": False, - "error": t('api.batchInterviewTimeout', error=str(e)) + "error": f"等待批量Interview响应超时: {str(e)}" }), 504 except Exception as e: @@ -2450,27 +2445,27 @@ def interview_all_agents(): if not simulation_id: return jsonify({ "success": False, - "error": t('api.requireSimulationId') + "error": "请提供 simulation_id" }), 400 if not prompt: return jsonify({ "success": False, - "error": t('api.requirePrompt') + "error": "请提供 prompt(采访问题)" }), 400 # 验证platform参数 if platform and platform not in ("twitter", "reddit"): return jsonify({ "success": False, - "error": t('api.invalidInterviewPlatform') + "error": "platform 参数只能是 'twitter' 或 'reddit'" }), 400 # 检查环境状态 if not SimulationRunner.check_env_alive(simulation_id): return jsonify({ "success": False, - "error": t('api.envNotRunning') + "error": "模拟环境未运行或已关闭。请确保模拟已完成并进入等待命令模式。" }), 400 # 优化prompt,添加前缀避免Agent调用工具 @@ -2497,7 +2492,7 @@ def interview_all_agents(): except TimeoutError as e: return jsonify({ "success": False, - "error": t('api.globalInterviewTimeout', error=str(e)) + "error": f"等待全局Interview响应超时: {str(e)}" }), 504 except Exception as e: @@ -2554,7 +2549,7 @@ def get_interview_history(): if not simulation_id: return jsonify({ "success": False, - "error": t('api.requireSimulationId') + "error": "请提供 simulation_id" }), 400 history = SimulationRunner.get_interview_history( @@ -2613,7 +2608,7 @@ def get_env_status(): if not simulation_id: return jsonify({ "success": False, - "error": t('api.requireSimulationId') + "error": "请提供 simulation_id" }), 400 env_alive = SimulationRunner.check_env_alive(simulation_id) @@ -2622,9 +2617,9 @@ def get_env_status(): env_status = SimulationRunner.get_env_status_detail(simulation_id) if env_alive: - message = t('api.envRunning') + message = "环境正在运行,可以接收Interview命令" else: - message = t('api.envNotRunningShort') + message = "环境未运行或已关闭" return jsonify({ "success": True, @@ -2681,7 +2676,7 @@ def close_simulation_env(): if not simulation_id: return jsonify({ "success": False, - "error": t('api.requireSimulationId') + "error": "请提供 simulation_id" }), 400 result = SimulationRunner.close_simulation_env( diff --git a/backend/app/config.py b/backend/app/config.py index 953dfa50..0552394d 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -32,8 +32,15 @@ class Config: LLM_BASE_URL = os.environ.get('LLM_BASE_URL', 'https://api.openai.com/v1') LLM_MODEL_NAME = os.environ.get('LLM_MODEL_NAME', 'gpt-4o-mini') - # Zep配置 - ZEP_API_KEY = os.environ.get('ZEP_API_KEY') + # Neo4j + Graphiti配置(替代 Zep Cloud) + NEO4J_URI = os.environ.get('NEO4J_URI', 'bolt://localhost:7687') + NEO4J_USER = os.environ.get('NEO4J_USER', 'neo4j') + NEO4J_PASSWORD = os.environ.get('NEO4J_PASSWORD', 'mirofish123') + # Embedding model — override when using non-OpenAI APIs (e.g. Gemini: text-embedding-004) + EMBEDDING_MODEL = os.environ.get('EMBEDDING_MODEL', 'text-embedding-3-small') + + # Zep配置(保留兼容性,已废弃) + ZEP_API_KEY = os.environ.get('ZEP_API_KEY', '') # 文件上传配置 MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50MB @@ -69,7 +76,7 @@ class Config: errors = [] if not cls.LLM_API_KEY: errors.append("LLM_API_KEY 未配置") - if not cls.ZEP_API_KEY: - errors.append("ZEP_API_KEY 未配置") + if not cls.NEO4J_PASSWORD: + errors.append("NEO4J_PASSWORD 未配置") return errors diff --git a/backend/app/services/graph_builder.py b/backend/app/services/graph_builder.py index 37c9969c..57262ab5 100644 --- a/backend/app/services/graph_builder.py +++ b/backend/app/services/graph_builder.py @@ -10,8 +10,7 @@ import threading from typing import Dict, Any, List, Optional, Callable from dataclasses import dataclass -from zep_cloud.client import Zep -from zep_cloud import EpisodeData, EntityEdgeSourceTarget +from .graphiti_adapter import GraphitiAdapter from ..config import Config from ..models.task import TaskManager, TaskStatus @@ -20,6 +19,54 @@ from .text_processor import TextProcessor from ..utils.locale import t, get_locale, set_locale +def _classify_entity_type(name: str, summary: str, ontology: Optional[Dict]) -> str: + """ + Classify an entity into an ontology type using keyword matching + against entity type names, descriptions, and examples. + Falls back to 'Entity' if no ontology or no match found. + """ + if not ontology: + return "Entity" + entity_types = ontology.get("entity_types", []) + if not entity_types: + return "Entity" + + name_lower = (name or "").lower() + summary_lower = (summary or "").lower() + search_text = f"{name_lower} {summary_lower}" + + best_type = "Entity" + best_score = 0 + + for et in entity_types: + score = 0 + type_name = et.get("name", "") + type_name_lower = type_name.lower() + + # Exact name match in type name + if type_name_lower in name_lower: + score += 10 + + # Check examples list + for example in et.get("examples", []): + if example.lower() in search_text: + score += 8 + elif name_lower in example.lower(): + score += 6 + + # Check description keywords + desc_words = (et.get("description", "")).lower().split() + for word in desc_words: + if len(word) > 4 and word in search_text: + score += 1 + + if score > best_score: + best_score = score + best_type = type_name + + return best_type if best_score > 0 else "Entity" + + @dataclass class GraphInfo: """图谱信息""" @@ -44,11 +91,7 @@ class GraphBuilderService: """ def __init__(self, api_key: Optional[str] = None): - self.api_key = api_key or Config.ZEP_API_KEY - if not self.api_key: - raise ValueError("ZEP_API_KEY 未配置") - - self.client = Zep(api_key=self.api_key) + self.client = GraphitiAdapter() self.task_manager = TaskManager() def build_graph_async( @@ -203,106 +246,27 @@ class GraphBuilderService: return graph_id def set_ontology(self, graph_id: str, ontology: Dict[str, Any]): - """设置图谱本体(公开方法)""" - import warnings - from typing import Optional - from pydantic import Field - from zep_cloud.external_clients.ontology import EntityModel, EntityText, EdgeModel - - # 抑制 Pydantic v2 关于 Field(default=None) 的警告 - # 这是 Zep SDK 要求的用法,警告来自动态类创建,可以安全忽略 - warnings.filterwarnings('ignore', category=UserWarning, module='pydantic') - - # Zep 保留名称,不能作为属性名 - RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'} - - def safe_attr_name(attr_name: str) -> str: - """将保留名称转换为安全名称""" - if attr_name.lower() in RESERVED_NAMES: - return f"entity_{attr_name}" - return attr_name - - # 动态创建实体类型 - entity_types = {} - for entity_def in ontology.get("entity_types", []): - name = entity_def["name"] - description = entity_def.get("description", f"A {name} entity.") - - # 创建属性字典和类型注解(Pydantic v2 需要) - attrs = {"__doc__": description} - annotations = {} - - for attr_def in entity_def.get("attributes", []): - attr_name = safe_attr_name(attr_def["name"]) # 使用安全名称 - attr_desc = attr_def.get("description", attr_name) - # Zep API 需要 Field 的 description,这是必需的 - attrs[attr_name] = Field(description=attr_desc, default=None) - annotations[attr_name] = Optional[EntityText] # 类型注解 - - attrs["__annotations__"] = annotations - - # 动态创建类 - entity_class = type(name, (EntityModel,), attrs) - entity_class.__doc__ = description - entity_types[name] = entity_class - - # 动态创建边类型 - edge_definitions = {} - for edge_def in ontology.get("edge_types", []): - name = edge_def["name"] - description = edge_def.get("description", f"A {name} relationship.") - - # 创建属性字典和类型注解 - attrs = {"__doc__": description} - annotations = {} - - for attr_def in edge_def.get("attributes", []): - attr_name = safe_attr_name(attr_def["name"]) # 使用安全名称 - attr_desc = attr_def.get("description", attr_name) - # Zep API 需要 Field 的 description,这是必需的 - attrs[attr_name] = Field(description=attr_desc, default=None) - annotations[attr_name] = Optional[str] # 边属性用str类型 - - attrs["__annotations__"] = annotations - - # 动态创建类 - class_name = ''.join(word.capitalize() for word in name.split('_')) - edge_class = type(class_name, (EdgeModel,), attrs) - edge_class.__doc__ = description - - # 构建source_targets - source_targets = [] - for st in edge_def.get("source_targets", []): - source_targets.append( - EntityEdgeSourceTarget( - source=st.get("source", "Entity"), - target=st.get("target", "Entity") - ) - ) - - if source_targets: - edge_definitions[name] = (edge_class, source_targets) - - # 调用Zep API设置本体 - if entity_types or edge_definitions: - self.client.graph.set_ontology( - graph_ids=[graph_id], - entities=entity_types if entity_types else None, - edges=edge_definitions if edge_definitions else None, - ) + """设置图谱本体提示(Graphiti自动提取实体,本体作为提示存储)""" + self.client.graph.set_ontology( + graph_ids=[graph_id], + entities=ontology.get("entity_types"), + edges=ontology.get("edge_types"), + ) def add_text_batches( self, graph_id: str, chunks: List[str], batch_size: int = 3, - progress_callback: Optional[Callable] = None + progress_callback: Optional[Callable] = None, + skip_chunks: int = 0, ) -> List[str]: - """分批添加文本到图谱,返回所有 episode 的 uuid 列表""" + """分批添加文本到图谱,返回所有 episode 的 uuid 列表。 + skip_chunks: 跳过已处理的块数(用于断点续传)。""" episode_uuids = [] total_chunks = len(chunks) - - for i in range(0, total_chunks, batch_size): + + for i in range(skip_chunks, total_chunks, batch_size): batch_chunks = chunks[i:i + batch_size] batch_num = i // batch_size + 1 total_batches = (total_chunks + batch_size - 1) // batch_size @@ -313,10 +277,11 @@ class GraphBuilderService: t('progress.sendingBatch', current=batch_num, total=total_batches, chunks=len(batch_chunks)), progress ) + # 构建episode数据 episodes = [ - EpisodeData(data=chunk, type="text") + type('Episode', (), {'data': chunk, 'type': 'text'})() for chunk in batch_chunks ] @@ -423,7 +388,7 @@ class GraphBuilderService: entity_types=list(entity_types) ) - def get_graph_data(self, graph_id: str) -> Dict[str, Any]: + def get_graph_data(self, graph_id: str, ontology: Optional[Dict] = None) -> Dict[str, Any]: """ 获取完整图谱数据(包含详细信息) @@ -448,10 +413,15 @@ class GraphBuilderService: if created_at: created_at = str(created_at) + entity_type = _classify_entity_type(node.name, node.summary or "", ontology) + labels = node.labels or [] + if entity_type != "Entity" and entity_type not in labels: + labels = [entity_type] + [l for l in labels if l != "Entity"] + nodes_data.append({ "uuid": node.uuid_, "name": node.name, - "labels": node.labels or [], + "labels": labels, "summary": node.summary or "", "attributes": node.attributes or {}, "created_at": created_at, diff --git a/backend/app/services/graphiti_adapter.py b/backend/app/services/graphiti_adapter.py new file mode 100644 index 00000000..cf9cf182 --- /dev/null +++ b/backend/app/services/graphiti_adapter.py @@ -0,0 +1,491 @@ +""" +Graphiti Adapter — Drop-in replacement for the Zep Cloud client. + +Exposes the same namespace as the Zep client so all consuming code +(graph_builder, zep_tools, zep_entity_reader, etc.) needs only a +one-line import swap: + + from .graphiti_adapter import GraphitiAdapter + self.client = GraphitiAdapter() + +Then all self.client.graph.* calls work unchanged. +""" + +import asyncio +import threading +import uuid as _uuid_mod +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from graphiti_core import Graphiti +from graphiti_core.nodes import EpisodeType, EntityNode +from graphiti_core.edges import EntityEdge +from graphiti_core.search.search_config import SearchConfig, SearchResults +from graphiti_core.search.search_config_recipes import ( + NODE_HYBRID_SEARCH_RRF, + EDGE_HYBRID_SEARCH_RRF, +) +from graphiti_core.llm_client.config import LLMConfig +from graphiti_core.llm_client.gemini_client import GeminiClient +from graphiti_core.embedder.gemini import GeminiEmbedder, GeminiEmbedderConfig +from graphiti_core.cross_encoder.client import CrossEncoderClient + +from ..config import Config +from ..utils.logger import get_logger + +logger = get_logger('mirofish.graphiti_adapter') + + +class _GeminiReranker(CrossEncoderClient): + """Simple reranker using Gemini — returns passages sorted by relevance.""" + + def __init__(self, client: GeminiClient): + self._client = client + + async def rank(self, query: str, passages: list[str]) -> list[tuple[str, float]]: + if not passages: + return [] + # Return in original order — Gemini doesn't support logprobs for reranking + # This is a no-op reranker: correct but unoptimized ordering + return [(p, 1.0 - i * 0.01) for i, p in enumerate(passages)] + +# --------------------------------------------------------------------------- +# Persistent event loop in a dedicated background thread. +# All async calls are submitted here so the Neo4j driver (which is bound +# to one event loop) never crosses loop boundaries. +# --------------------------------------------------------------------------- +_loop: Optional[asyncio.AbstractEventLoop] = None +_loop_thread: Optional[threading.Thread] = None +_loop_lock = threading.Lock() + + +def _get_loop() -> asyncio.AbstractEventLoop: + global _loop, _loop_thread + if _loop is None: + with _loop_lock: + if _loop is None: + _loop = asyncio.new_event_loop() + _loop_thread = threading.Thread( + target=_loop.run_forever, daemon=True, name="graphiti-event-loop" + ) + _loop_thread.start() + return _loop + + +def _run(coro): + """Submit coroutine to the persistent event loop thread and wait for result.""" + future = asyncio.run_coroutine_threadsafe(coro, _get_loop()) + return future.result(timeout=300) + + +# --------------------------------------------------------------------------- +# Singleton Graphiti instance (one Neo4j driver for the whole process) +# --------------------------------------------------------------------------- +_graphiti_instance: Optional[Graphiti] = None +_graphiti_lock = threading.Lock() + + +def _get_graphiti() -> Graphiti: + global _graphiti_instance + if _graphiti_instance is None: + with _graphiti_lock: + if _graphiti_instance is None: + logger.info("Initializing Graphiti client...") + llm_cfg = LLMConfig( + api_key=Config.LLM_API_KEY, + model=Config.LLM_MODEL_NAME, + ) + llm_client = GeminiClient(config=llm_cfg) + embedder = GeminiEmbedder( + config=GeminiEmbedderConfig( + api_key=Config.LLM_API_KEY, + embedding_model=Config.EMBEDDING_MODEL, + ) + ) + cross_encoder = _GeminiReranker(llm_client) + g = Graphiti( + Config.NEO4J_URI, + Config.NEO4J_USER, + Config.NEO4J_PASSWORD, + llm_client=llm_client, + embedder=embedder, + cross_encoder=cross_encoder, + ) + # Use the persistent loop so the driver is bound to it from the start + _run(g.build_indices_and_constraints()) + _graphiti_instance = g + logger.info("Graphiti client ready.") + return _graphiti_instance + + +# --------------------------------------------------------------------------- +# Compatibility data classes (mimic Zep response objects) +# --------------------------------------------------------------------------- + +@dataclass +class _NodeResult: + """Zep-compatible node object.""" + uuid_: str + name: str + labels: List[str] + summary: str + attributes: Dict[str, Any] + created_at: Optional[str] = None + + @property + def uuid(self): + return self.uuid_ + + +@dataclass +class _EdgeResult: + """Zep-compatible edge object.""" + uuid_: str + name: str + fact: str + source_node_uuid: str + target_node_uuid: str + attributes: Dict[str, Any] + created_at: Optional[str] = None + valid_at: Optional[str] = None + invalid_at: Optional[str] = None + expired_at: Optional[str] = None + + @property + def uuid(self): + return self.uuid_ + + +@dataclass +class _EpisodeResult: + """Zep-compatible episode object — always processed (Graphiti is sync).""" + uuid_: str + processed: bool = True + + @property + def uuid(self): + return self.uuid_ + + +@dataclass +class _SearchResults: + """Zep-compatible search result object.""" + edges: List[_EdgeResult] = field(default_factory=list) + nodes: List[_NodeResult] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Helpers: convert Graphiti objects → Zep-compatible objects +# --------------------------------------------------------------------------- + +def _to_ts(dt: Optional[datetime]) -> Optional[str]: + if dt is None: + return None + return dt.isoformat() + + +def _entity_node_to_result(n: EntityNode) -> _NodeResult: + return _NodeResult( + uuid_=n.uuid, + name=n.name, + labels=list(n.labels) if n.labels else ["Entity"], + summary=n.summary or "", + attributes=n.attributes or {}, + created_at=_to_ts(n.created_at), + ) + + +def _entity_edge_to_result(e: EntityEdge) -> _EdgeResult: + return _EdgeResult( + uuid_=e.uuid, + name=e.name or "", + fact=e.fact or "", + source_node_uuid=e.source_node_uuid, + target_node_uuid=e.target_node_uuid, + attributes={}, + created_at=_to_ts(e.created_at), + valid_at=_to_ts(e.valid_at), + invalid_at=_to_ts(e.invalid_at), + expired_at=_to_ts(e.expired_at), + ) + + +def _neo4j_record_to_node(record: Dict) -> _NodeResult: + labels = record.get("labels", ["Entity"]) + if isinstance(labels, (list, tuple)): + labels = [str(l) for l in labels] + return _NodeResult( + uuid_=record.get("uuid", ""), + name=record.get("name", ""), + labels=labels, + summary=record.get("summary", ""), + attributes=record.get("attributes") or {}, + created_at=str(record.get("created_at", "")) or None, + ) + + +def _neo4j_record_to_edge(record: Dict) -> _EdgeResult: + def ts(v): + return str(v) if v else None + return _EdgeResult( + uuid_=record.get("uuid", ""), + name=record.get("name", ""), + fact=record.get("fact", ""), + source_node_uuid=record.get("source_node_uuid", ""), + target_node_uuid=record.get("target_node_uuid", ""), + attributes=record.get("attributes") or {}, + created_at=ts(record.get("created_at")), + valid_at=ts(record.get("valid_at")), + invalid_at=ts(record.get("invalid_at")), + expired_at=ts(record.get("expired_at")), + ) + + +# --------------------------------------------------------------------------- +# Neo4j direct query helpers +# --------------------------------------------------------------------------- + +async def _neo4j_query(graphiti: Graphiti, cypher: str, params: Dict) -> List[Dict]: + """Execute a read Cypher query and return list of record dicts.""" + records, _, _ = await graphiti.driver.execute_query(cypher, params) + return [dict(r) for r in records] + + +async def _neo4j_write(graphiti: Graphiti, cypher: str, params: Dict) -> None: + """Execute a write Cypher query.""" + await graphiti.driver.execute_query(cypher, params) + + +# Cypher queries +_NODES_BY_GROUP = """ +MATCH (n:Entity {group_id: $group_id}) +RETURN n.uuid AS uuid, n.name AS name, n.summary AS summary, + labels(n) AS labels, n.created_at AS created_at, + n.attributes AS attributes +ORDER BY n.created_at ASC +SKIP $skip LIMIT $limit +""" + +_EDGES_BY_GROUP = """ +MATCH (s:Entity {group_id: $group_id})-[r:RELATES_TO]->(t:Entity {group_id: $group_id}) +RETURN r.uuid AS uuid, r.name AS name, r.fact AS fact, + s.uuid AS source_node_uuid, + t.uuid AS target_node_uuid, + r.created_at AS created_at, r.valid_at AS valid_at, + r.invalid_at AS invalid_at, r.expired_at AS expired_at, + r.attributes AS attributes +ORDER BY r.created_at ASC +SKIP $skip LIMIT $limit +""" + +_NODE_BY_UUID = """ +MATCH (n:Entity {uuid: $uuid}) +RETURN n.uuid AS uuid, n.name AS name, n.summary AS summary, + labels(n) AS labels, n.created_at AS created_at, + n.group_id AS group_id, n.attributes AS attributes +LIMIT 1 +""" + +_EDGES_BY_NODE_UUID = """ +MATCH (s:Entity {uuid: $node_uuid})-[r:RELATES_TO]->(t:Entity) +RETURN r.uuid AS uuid, r.name AS name, r.fact AS fact, + s.uuid AS source_node_uuid, + t.uuid AS target_node_uuid, + r.created_at AS created_at, r.valid_at AS valid_at, + r.invalid_at AS invalid_at, r.expired_at AS expired_at +UNION +MATCH (s:Entity)-[r:RELATES_TO]->(t:Entity {uuid: $node_uuid}) +RETURN r.uuid AS uuid, r.name AS name, r.fact AS fact, + s.uuid AS source_node_uuid, + t.uuid AS target_node_uuid, + r.created_at AS created_at, r.valid_at AS valid_at, + r.invalid_at AS invalid_at, r.expired_at AS expired_at +""" + +_DELETE_GROUP = """ +MATCH (n:Entity {group_id: $group_id}) +DETACH DELETE n +""" + + +# --------------------------------------------------------------------------- +# Sub-namespaces +# --------------------------------------------------------------------------- + +class _EpisodeNamespace: + def get(self, uuid_: str) -> _EpisodeResult: + """Always returns processed=True — Graphiti is synchronous.""" + return _EpisodeResult(uuid_=uuid_, processed=True) + + +class _NodeNamespace: + def __init__(self, graphiti: Graphiti): + self._g = graphiti + + def get_by_graph_id( + self, + graph_id: str, + limit: int = 100, + uuid_cursor: Optional[str] = None, + ) -> List[_NodeResult]: + """Return nodes for a group. First call returns all; cursor call returns empty.""" + if uuid_cursor is not None: + # Already fetched all on first call — signal end of pagination + return [] + records = _run(_neo4j_query( + self._g, _NODES_BY_GROUP, + {"group_id": graph_id, "skip": 0, "limit": 10000} + )) + return [_neo4j_record_to_node(r) for r in records] + + def get(self, uuid_: str) -> _NodeResult: + records = _run(_neo4j_query(self._g, _NODE_BY_UUID, {"uuid": uuid_})) + if not records: + return _NodeResult(uuid_=uuid_, name="", labels=[], summary="", attributes={}) + return _neo4j_record_to_node(records[0]) + + def get_entity_edges(self, node_uuid: str) -> List[_EdgeResult]: + records = _run(_neo4j_query( + self._g, _EDGES_BY_NODE_UUID, {"node_uuid": node_uuid} + )) + return [_neo4j_record_to_edge(r) for r in records] + + +class _EdgeNamespace: + def __init__(self, graphiti: Graphiti): + self._g = graphiti + + def get_by_graph_id( + self, + graph_id: str, + limit: int = 100, + uuid_cursor: Optional[str] = None, + ) -> List[_EdgeResult]: + """Return edges for a group. First call returns all; cursor call returns empty.""" + if uuid_cursor is not None: + return [] + records = _run(_neo4j_query( + self._g, _EDGES_BY_GROUP, + {"group_id": graph_id, "skip": 0, "limit": 50000} + )) + return [_neo4j_record_to_edge(r) for r in records] + + +class _GraphNamespace: + def __init__(self, graphiti: Graphiti): + self._g = graphiti + self.node = _NodeNamespace(graphiti) + self.edge = _EdgeNamespace(graphiti) + self.episode = _EpisodeNamespace() + self._ontologies: Dict[str, Dict] = {} # graph_id -> ontology dict + + def create(self, graph_id: str, name: str, description: str = "") -> None: + """No-op — Graphiti uses group_id implicitly, no explicit creation needed.""" + logger.info(f"Graph '{graph_id}' registered (group_id in Graphiti)") + + def set_ontology( + self, + graph_ids: List[str], + entities: Any = None, + edges: Any = None, + ) -> None: + """Store ontology hints for use during episode ingestion. Graphiti extracts entities dynamically.""" + for gid in graph_ids: + self._ontologies[gid] = {"entities": entities, "edges": edges} + logger.info(f"Ontology hints stored for graphs: {graph_ids}") + + def add(self, graph_id: str, type: str = "text", data: str = "") -> _EpisodeResult: + """Add a single text episode to the graph.""" + result = _run(self._g.add_episode( + name=f"activity_{_uuid_mod.uuid4().hex[:8]}", + episode_body=data, + source_description="MiroFish simulation activity", + reference_time=datetime.now(timezone.utc), + source=EpisodeType.text, + group_id=graph_id, + update_communities=False, + )) + ep_uuid_out = result.episode.uuid if result and result.episode else str(_uuid_mod.uuid4()) + return _EpisodeResult(uuid_=ep_uuid_out) + + def add_batch(self, graph_id: str, episodes: List[Any]) -> List[_EpisodeResult]: + """Add a batch of episodes. Returns list of EpisodeResult with uuid_.""" + results = [] + for ep in episodes: + text = getattr(ep, 'data', '') or str(ep) + try: + result = _run(self._g.add_episode( + name=f"chunk_{_uuid_mod.uuid4().hex[:8]}", + episode_body=text, + source_description="MiroFish document chunk", + reference_time=datetime.now(timezone.utc), + source=EpisodeType.text, + group_id=graph_id, + update_communities=False, + )) + ep_uuid_out = result.episode.uuid if result and result.episode else str(_uuid_mod.uuid4()) + except Exception as e: + logger.warning(f"Episode add failed: {str(e)[:100]}, using placeholder uuid") + ep_uuid_out = str(_uuid_mod.uuid4()) + results.append(_EpisodeResult(uuid_=ep_uuid_out)) + return results + + def search( + self, + graph_id: str, + query: str, + limit: int = 10, + scope: str = "edges", + reranker: Optional[str] = None, + ) -> _SearchResults: + """Semantic search over the graph. scope='edges'|'nodes'|'both'.""" + try: + if scope == "nodes": + results = _run(self._g.search_( + query=query, + config=SearchConfig( + node_config=NODE_HYBRID_SEARCH_RRF.node_config, + limit=limit, + ), + group_ids=[graph_id], + )) + nodes = [_entity_node_to_result(n) for n in (results.nodes or [])] + return _SearchResults(nodes=nodes) + else: + edges = _run(self._g.search( + query=query, + group_ids=[graph_id], + num_results=limit, + )) + return _SearchResults(edges=[_entity_edge_to_result(e) for e in (edges or [])]) + except Exception as e: + logger.warning(f"Graph search failed: {str(e)[:150]}") + return _SearchResults() + + def delete(self, graph_id: str) -> None: + """Delete all nodes and edges for a group_id.""" + _run(_neo4j_write(self._g, _DELETE_GROUP, {"group_id": graph_id})) + logger.info(f"Graph '{graph_id}' deleted from Neo4j") + + +# --------------------------------------------------------------------------- +# Main adapter class — drop-in for Zep(api_key=...) +# --------------------------------------------------------------------------- + +class GraphitiAdapter: + """ + Drop-in replacement for `from zep_cloud.client import Zep`. + + Usage: + self.client = GraphitiAdapter() + self.client.graph.create(graph_id, name) + self.client.graph.search(graph_id, query, limit, scope) + self.client.graph.node.get(uuid_) + ... + """ + + def __init__(self, api_key: Optional[str] = None): + # api_key ignored — kept for signature compatibility + graphiti = _get_graphiti() + self.graph = _GraphNamespace(graphiti) diff --git a/backend/app/services/oasis_profile_generator.py b/backend/app/services/oasis_profile_generator.py index 7704a627..c15b8c30 100644 --- a/backend/app/services/oasis_profile_generator.py +++ b/backend/app/services/oasis_profile_generator.py @@ -16,7 +16,7 @@ from dataclasses import dataclass, field from datetime import datetime from openai import OpenAI -from zep_cloud.client import Zep +from .graphiti_adapter import GraphitiAdapter from ..config import Config from ..utils.logger import get_logger @@ -198,16 +198,8 @@ class OasisProfileGenerator: base_url=self.base_url ) - # Zep客户端用于检索丰富上下文 - self.zep_api_key = zep_api_key or Config.ZEP_API_KEY - self.zep_client = None + self.zep_client = GraphitiAdapter() self.graph_id = graph_id - - if self.zep_api_key: - try: - self.zep_client = Zep(api_key=self.zep_api_key) - except Exception as e: - logger.warning(f"Zep客户端初始化失败: {e}") def generate_profile_from_entity( self, diff --git a/backend/app/services/zep_entity_reader.py b/backend/app/services/zep_entity_reader.py index 71661be4..e664959c 100644 --- a/backend/app/services/zep_entity_reader.py +++ b/backend/app/services/zep_entity_reader.py @@ -7,7 +7,7 @@ import time from typing import Dict, Any, List, Optional, Set, Callable, TypeVar from dataclasses import dataclass, field -from zep_cloud.client import Zep +from .graphiti_adapter import GraphitiAdapter from ..config import Config from ..utils.logger import get_logger @@ -79,11 +79,7 @@ class ZepEntityReader: """ def __init__(self, api_key: Optional[str] = None): - self.api_key = api_key or Config.ZEP_API_KEY - if not self.api_key: - raise ValueError("ZEP_API_KEY 未配置") - - self.client = Zep(api_key=self.api_key) + self.client = GraphitiAdapter() def _call_with_retry( self, @@ -234,27 +230,51 @@ class ZepEntityReader: FilteredEntities: 过滤后的实体集合 """ logger.info(f"开始筛选图谱 {graph_id} 的实体...") - + + # Look up ontology from project to classify entities + ontology = None + try: + from ..models.project import ProjectManager + from .graph_builder import _classify_entity_type + for p in ProjectManager.list_projects(): + if p.graph_id == graph_id and p.ontology: + ontology = p.ontology + break + except Exception: + pass + # 获取所有节点 all_nodes = self.get_all_nodes(graph_id) total_count = len(all_nodes) - + + # Apply ontology-based classification so all nodes get proper type labels + if ontology: + for node in all_nodes: + labels = node.get("labels", []) + custom = [l for l in labels if l not in ("Entity", "Node")] + if not custom: + entity_type = _classify_entity_type( + node.get("name", ""), node.get("summary", ""), ontology + ) + if entity_type != "Entity": + node["labels"] = [entity_type] + labels + # 获取所有边(用于后续关联查找) all_edges = self.get_all_edges(graph_id) if enrich_with_edges else [] - + # 构建节点UUID到节点数据的映射 node_map = {n["uuid"]: n for n in all_nodes} - + # 筛选符合条件的实体 filtered_entities = [] entity_types_found = set() - + for node in all_nodes: labels = node.get("labels", []) - + # 筛选逻辑:Labels必须包含除"Entity"和"Node"之外的标签 custom_labels = [l for l in labels if l not in ["Entity", "Node"]] - + if not custom_labels: # 只有默认标签,跳过 continue diff --git a/backend/app/services/zep_graph_memory_updater.py b/backend/app/services/zep_graph_memory_updater.py index e034fee2..02f68209 100644 --- a/backend/app/services/zep_graph_memory_updater.py +++ b/backend/app/services/zep_graph_memory_updater.py @@ -12,7 +12,7 @@ from dataclasses import dataclass from datetime import datetime from queue import Queue, Empty -from zep_cloud.client import Zep +from .graphiti_adapter import GraphitiAdapter from ..config import Config from ..utils.logger import get_logger @@ -238,12 +238,7 @@ class ZepGraphMemoryUpdater: api_key: Zep API Key(可选,默认从配置读取) """ self.graph_id = graph_id - self.api_key = api_key or Config.ZEP_API_KEY - - if not self.api_key: - raise ValueError("ZEP_API_KEY未配置") - - self.client = Zep(api_key=self.api_key) + self.client = GraphitiAdapter() # 活动队列 self._activity_queue: Queue = Queue() diff --git a/backend/app/services/zep_tools.py b/backend/app/services/zep_tools.py index 3bc8a57a..301709d2 100644 --- a/backend/app/services/zep_tools.py +++ b/backend/app/services/zep_tools.py @@ -13,12 +13,11 @@ import json from typing import Dict, Any, List, Optional from dataclasses import dataclass, field -from zep_cloud.client import Zep +from .graphiti_adapter import GraphitiAdapter from ..config import Config from ..utils.logger import get_logger from ..utils.llm_client import LLMClient -from ..utils.locale import get_locale, t from ..utils.zep_paging import fetch_all_nodes, fetch_all_edges logger = get_logger('mirofish.zep_tools') @@ -423,14 +422,10 @@ class ZepToolsService: RETRY_DELAY = 2.0 def __init__(self, api_key: Optional[str] = None, llm_client: Optional[LLMClient] = None): - self.api_key = api_key or Config.ZEP_API_KEY - if not self.api_key: - raise ValueError("ZEP_API_KEY 未配置") - - self.client = Zep(api_key=self.api_key) + self.client = GraphitiAdapter() # LLM客户端用于InsightForge生成子问题 self._llm_client = llm_client - logger.info(t("console.zepToolsInitialized")) + logger.info("ZepToolsService 初始化完成") @property def llm(self) -> LLMClient: @@ -440,25 +435,38 @@ class ZepToolsService: return self._llm_client def _call_with_retry(self, func, operation_name: str, max_retries: int = None): - """带重试机制的API调用""" + """带重试机制的API调用(自动处理429限速)""" max_retries = max_retries or self.MAX_RETRIES last_exception = None delay = self.RETRY_DELAY - + for attempt in range(max_retries): try: return func() except Exception as e: last_exception = e if attempt < max_retries - 1: - logger.warning( - t("console.zepRetryAttempt", operation=operation_name, attempt=attempt + 1, error=str(e)[:100], delay=f"{delay:.1f}") - ) - time.sleep(delay) + # 检测429限速错误,使用retry-after头部的等待时间 + wait = delay + if hasattr(e, 'status_code') and e.status_code == 429: + retry_after = None + if hasattr(e, 'headers') and e.headers: + retry_after = e.headers.get('retry-after') + wait = float(retry_after) + 1 if retry_after else 65.0 + logger.warning( + f"Zep {operation_name} 触发限速 (429), " + f"等待 {wait:.0f} 秒后重试 (第 {attempt + 1}/{max_retries - 1} 次)..." + ) + else: + logger.warning( + f"Zep {operation_name} 第 {attempt + 1} 次尝试失败: {str(e)[:100]}, " + f"{wait:.1f}秒后重试..." + ) + time.sleep(wait) delay *= 2 else: - logger.error(t("console.zepAllRetriesFailed", operation=operation_name, retries=max_retries, error=str(e))) - + logger.error(f"Zep {operation_name} 在 {max_retries} 次尝试后仍失败: {str(e)}") + raise last_exception def search_graph( @@ -483,7 +491,7 @@ class ZepToolsService: Returns: SearchResult: 搜索结果 """ - logger.info(t("console.graphSearch", graphId=graph_id, query=query[:50])) + logger.info(f"图谱搜索: graph_id={graph_id}, query={query[:50]}...") # 尝试使用Zep Cloud Search API try: @@ -495,7 +503,7 @@ class ZepToolsService: scope=scope, reranker="cross_encoder" ), - operation_name=t("console.graphSearchOp", graphId=graph_id) + operation_name=f"图谱搜索(graph={graph_id})" ) facts = [] @@ -528,7 +536,7 @@ class ZepToolsService: if hasattr(node, 'summary') and node.summary: facts.append(f"[{node.name}]: {node.summary}") - logger.info(t("console.searchComplete", count=len(facts))) + logger.info(f"搜索完成: 找到 {len(facts)} 条相关事实") return SearchResult( facts=facts, @@ -539,7 +547,7 @@ class ZepToolsService: ) except Exception as e: - logger.warning(t("console.zepSearchApiFallback", error=str(e))) + logger.warning(f"Zep Search API失败,降级为本地搜索: {str(e)}") # 降级:使用本地关键词匹配搜索 return self._local_search(graph_id, query, limit, scope) @@ -564,7 +572,7 @@ class ZepToolsService: Returns: SearchResult: 搜索结果 """ - logger.info(t("console.usingLocalSearch", query=query[:30])) + logger.info(f"使用本地搜索: query={query[:30]}...") facts = [] edges_result = [] @@ -634,10 +642,10 @@ class ZepToolsService: if node.summary: facts.append(f"[{node.name}]: {node.summary}") - logger.info(t("console.localSearchComplete", count=len(facts))) + logger.info(f"本地搜索完成: 找到 {len(facts)} 条相关事实") except Exception as e: - logger.error(t("console.localSearchFailed", error=str(e))) + logger.error(f"本地搜索失败: {str(e)}") return SearchResult( facts=facts, @@ -657,7 +665,7 @@ class ZepToolsService: Returns: 节点列表 """ - logger.info(t("console.fetchingAllNodes", graphId=graph_id)) + logger.info(f"获取图谱 {graph_id} 的所有节点...") nodes = fetch_all_nodes(self.client, graph_id) @@ -672,7 +680,7 @@ class ZepToolsService: attributes=node.attributes or {} )) - logger.info(t("console.fetchedNodes", count=len(result))) + logger.info(f"获取到 {len(result)} 个节点") return result def get_all_edges(self, graph_id: str, include_temporal: bool = True) -> List[EdgeInfo]: @@ -686,7 +694,7 @@ class ZepToolsService: Returns: 边列表(包含created_at, valid_at, invalid_at, expired_at) """ - logger.info(t("console.fetchingAllEdges", graphId=graph_id)) + logger.info(f"获取图谱 {graph_id} 的所有边...") edges = fetch_all_edges(self.client, graph_id) @@ -710,7 +718,7 @@ class ZepToolsService: result.append(edge_info) - logger.info(t("console.fetchedEdges", count=len(result))) + logger.info(f"获取到 {len(result)} 条边") return result def get_node_detail(self, node_uuid: str) -> Optional[NodeInfo]: @@ -723,12 +731,12 @@ class ZepToolsService: Returns: 节点信息或None """ - logger.info(t("console.fetchingNodeDetail", uuid=node_uuid[:8])) + logger.info(f"获取节点详情: {node_uuid[:8]}...") try: node = self._call_with_retry( func=lambda: self.client.graph.node.get(uuid_=node_uuid), - operation_name=t("console.fetchNodeDetailOp", uuid=node_uuid[:8]) + operation_name=f"获取节点详情(uuid={node_uuid[:8]}...)" ) if not node: @@ -742,7 +750,7 @@ class ZepToolsService: attributes=node.attributes or {} ) except Exception as e: - logger.error(t("console.fetchNodeDetailFailed", error=str(e))) + logger.error(f"获取节点详情失败: {str(e)}") return None def get_node_edges(self, graph_id: str, node_uuid: str) -> List[EdgeInfo]: @@ -758,7 +766,7 @@ class ZepToolsService: Returns: 边列表 """ - logger.info(t("console.fetchingNodeEdges", uuid=node_uuid[:8])) + logger.info(f"获取节点 {node_uuid[:8]}... 的相关边") try: # 获取图谱所有边,然后过滤 @@ -770,11 +778,11 @@ class ZepToolsService: if edge.source_node_uuid == node_uuid or edge.target_node_uuid == node_uuid: result.append(edge) - logger.info(t("console.foundNodeEdges", count=len(result))) + logger.info(f"找到 {len(result)} 条与节点相关的边") return result except Exception as e: - logger.warning(t("console.fetchNodeEdgesFailed", error=str(e))) + logger.warning(f"获取节点边失败: {str(e)}") return [] def get_entities_by_type( @@ -792,7 +800,7 @@ class ZepToolsService: Returns: 符合类型的实体列表 """ - logger.info(t("console.fetchingEntitiesByType", type=entity_type)) + logger.info(f"获取类型为 {entity_type} 的实体...") all_nodes = self.get_all_nodes(graph_id) @@ -802,7 +810,7 @@ class ZepToolsService: if entity_type in node.labels: filtered.append(node) - logger.info(t("console.foundEntitiesByType", count=len(filtered), type=entity_type)) + logger.info(f"找到 {len(filtered)} 个 {entity_type} 类型的实体") return filtered def get_entity_summary( @@ -822,7 +830,7 @@ class ZepToolsService: Returns: 实体摘要信息 """ - logger.info(t("console.fetchingEntitySummary", name=entity_name)) + logger.info(f"获取实体 {entity_name} 的关系摘要...") # 先搜索该实体相关的信息 search_result = self.search_graph( @@ -862,7 +870,7 @@ class ZepToolsService: Returns: 统计信息 """ - logger.info(t("console.fetchingGraphStats", graphId=graph_id)) + logger.info(f"获取图谱 {graph_id} 的统计信息...") nodes = self.get_all_nodes(graph_id) edges = self.get_all_edges(graph_id) @@ -906,7 +914,7 @@ class ZepToolsService: Returns: 模拟上下文信息 """ - logger.info(t("console.fetchingSimContext", requirement=simulation_requirement[:50])) + logger.info(f"获取模拟上下文: {simulation_requirement[:50]}...") # 搜索与模拟需求相关的信息 search_result = self.search_graph( @@ -970,7 +978,7 @@ class ZepToolsService: Returns: InsightForgeResult: 深度洞察检索结果 """ - logger.info(t("console.insightForgeStart", query=query[:50])) + logger.info(f"InsightForge 深度洞察检索: {query[:50]}...") result = InsightForgeResult( query=query, @@ -986,7 +994,7 @@ class ZepToolsService: max_queries=max_sub_queries ) result.sub_queries = sub_queries - logger.info(t("console.generatedSubQueries", count=len(sub_queries))) + logger.info(f"生成 {len(sub_queries)} 个子问题") # Step 2: 对每个子问题进行语义搜索 all_facts = [] @@ -1086,7 +1094,7 @@ class ZepToolsService: result.relationship_chains = relationship_chains result.total_relationships = len(relationship_chains) - logger.info(t("console.insightForgeComplete", facts=result.total_facts, entities=result.total_entities, relationships=result.total_relationships)) + logger.info(f"InsightForge完成: {result.total_facts}条事实, {result.total_entities}个实体, {result.total_relationships}条关系") return result def _generate_sub_queries( @@ -1133,7 +1141,7 @@ class ZepToolsService: return [str(sq) for sq in sub_queries[:max_queries]] except Exception as e: - logger.warning(t("console.generateSubQueriesFailed", error=str(e))) + logger.warning(f"生成子问题失败: {str(e)},使用默认子问题") # 降级:返回基于原问题的变体 return [ query, @@ -1168,7 +1176,7 @@ class ZepToolsService: Returns: PanoramaResult: 广度搜索结果 """ - logger.info(t("console.panoramaSearchStart", query=query[:50])) + logger.info(f"PanoramaSearch 广度搜索: {query[:50]}...") result = PanoramaResult(query=query) @@ -1231,7 +1239,7 @@ class ZepToolsService: result.active_count = len(active_facts) result.historical_count = len(historical_facts) - logger.info(t("console.panoramaSearchComplete", active=result.active_count, historical=result.historical_count)) + logger.info(f"PanoramaSearch完成: {result.active_count}条有效, {result.historical_count}条历史") return result def quick_search( @@ -1256,7 +1264,7 @@ class ZepToolsService: Returns: SearchResult: 搜索结果 """ - logger.info(t("console.quickSearchStart", query=query[:50])) + logger.info(f"QuickSearch 简单搜索: {query[:50]}...") # 直接调用现有的search_graph方法 result = self.search_graph( @@ -1266,7 +1274,7 @@ class ZepToolsService: scope="edges" ) - logger.info(t("console.quickSearchComplete", count=result.total_count)) + logger.info(f"QuickSearch完成: {result.total_count}条结果") return result def interview_agents( @@ -1306,7 +1314,7 @@ class ZepToolsService: """ from .simulation_runner import SimulationRunner - logger.info(t("console.interviewAgentsStart", requirement=interview_requirement[:50])) + logger.info(f"InterviewAgents 深度采访(真实API): {interview_requirement[:50]}...") result = InterviewResult( interview_topic=interview_requirement, @@ -1317,12 +1325,12 @@ class ZepToolsService: profiles = self._load_agent_profiles(simulation_id) if not profiles: - logger.warning(t("console.profilesNotFound", simId=simulation_id)) + logger.warning(f"未找到模拟 {simulation_id} 的人设文件") result.summary = "未找到可采访的Agent人设文件" return result result.total_agents = len(profiles) - logger.info(t("console.loadedProfiles", count=len(profiles))) + logger.info(f"加载到 {len(profiles)} 个Agent人设") # Step 2: 使用LLM选择要采访的Agent(返回agent_id列表) selected_agents, selected_indices, selection_reasoning = self._select_agents_for_interview( @@ -1334,7 +1342,7 @@ class ZepToolsService: result.selected_agents = selected_agents result.selection_reasoning = selection_reasoning - logger.info(t("console.selectedAgentsForInterview", count=len(selected_agents), indices=selected_indices)) + logger.info(f"选择了 {len(selected_agents)} 个Agent进行采访: {selected_indices}") # Step 3: 生成采访问题(如果没有提供) if not result.interview_questions: @@ -1343,7 +1351,7 @@ class ZepToolsService: simulation_requirement=simulation_requirement, selected_agents=selected_agents ) - logger.info(t("console.generatedInterviewQuestions", count=len(result.interview_questions))) + logger.info(f"生成了 {len(result.interview_questions)} 个采访问题") # 将问题合并为一个采访prompt combined_prompt = "\n".join([f"{i+1}. {q}" for i, q in enumerate(result.interview_questions)]) @@ -1373,7 +1381,7 @@ class ZepToolsService: # 不指定platform,API会在twitter和reddit两个平台都采访 }) - logger.info(t("console.callingBatchInterviewApi", count=len(interviews_request))) + logger.info(f"调用批量采访API(双平台): {len(interviews_request)} 个Agent") # 调用 SimulationRunner 的批量采访方法(不传platform,双平台采访) api_result = SimulationRunner.interview_agents_batch( @@ -1383,12 +1391,12 @@ class ZepToolsService: timeout=180.0 # 双平台需要更长超时 ) - logger.info(t("console.interviewApiReturned", count=api_result.get('interviews_count', 0), success=api_result.get('success'))) + logger.info(f"采访API返回: {api_result.get('interviews_count', 0)} 个结果, success={api_result.get('success')}") # 检查API调用是否成功 if not api_result.get("success", False): error_msg = api_result.get("error", "未知错误") - logger.warning(t("console.interviewApiReturnedFailure", error=error_msg)) + logger.warning(f"采访API返回失败: {error_msg}") result.summary = f"采访API调用失败:{error_msg}。请检查OASIS模拟环境状态。" return result @@ -1461,11 +1469,11 @@ class ZepToolsService: except ValueError as e: # 模拟环境未运行 - logger.warning(t("console.interviewApiCallFailed", error=e)) + logger.warning(f"采访API调用失败(环境未运行?): {e}") result.summary = f"采访失败:{str(e)}。模拟环境可能已关闭,请确保OASIS环境正在运行。" return result except Exception as e: - logger.error(t("console.interviewApiCallException", error=e)) + logger.error(f"采访API调用异常: {e}") import traceback logger.error(traceback.format_exc()) result.summary = f"采访过程发生错误:{str(e)}" @@ -1478,7 +1486,7 @@ class ZepToolsService: interview_requirement=interview_requirement ) - logger.info(t("console.interviewAgentsComplete", count=result.interviewed_count)) + logger.info(f"InterviewAgents完成: 采访了 {result.interviewed_count} 个Agent(双平台)") return result @staticmethod @@ -1521,10 +1529,10 @@ class ZepToolsService: try: with open(reddit_profile_path, 'r', encoding='utf-8') as f: profiles = json.load(f) - logger.info(t("console.loadedRedditProfiles", count=len(profiles))) + logger.info(f"从 reddit_profiles.json 加载了 {len(profiles)} 个人设") return profiles except Exception as e: - logger.warning(t("console.readRedditProfilesFailed", error=e)) + logger.warning(f"读取 reddit_profiles.json 失败: {e}") # 尝试读取Twitter CSV格式 twitter_profile_path = os.path.join(sim_dir, "twitter_profiles.csv") @@ -1541,10 +1549,10 @@ class ZepToolsService: "persona": row.get("user_char", ""), "profession": "未知" }) - logger.info(t("console.loadedTwitterProfiles", count=len(profiles))) + logger.info(f"从 twitter_profiles.csv 加载了 {len(profiles)} 个人设") return profiles except Exception as e: - logger.warning(t("console.readTwitterProfilesFailed", error=e)) + logger.warning(f"读取 twitter_profiles.csv 失败: {e}") return profiles @@ -1625,7 +1633,7 @@ class ZepToolsService: return selected_agents, valid_indices, reasoning except Exception as e: - logger.warning(t("console.llmSelectAgentFailed", error=e)) + logger.warning(f"LLM选择Agent失败,使用默认选择: {e}") # 降级:选择前N个 selected = profiles[:max_agents] indices = list(range(min(max_agents, len(profiles)))) @@ -1673,7 +1681,7 @@ class ZepToolsService: return response.get("questions", [f"关于{interview_requirement},您有什么看法?"]) except Exception as e: - logger.warning(t("console.generateInterviewQuestionsFailed", error=e)) + logger.warning(f"生成采访问题失败: {e}") return [ f"关于{interview_requirement},您的观点是什么?", "这件事对您或您所代表的群体有什么影响?", @@ -1695,8 +1703,7 @@ class ZepToolsService: for interview in interviews: interview_texts.append(f"【{interview.agent_name}({interview.agent_role})】\n{interview.response[:500]}") - quote_instruction = "引用受访者原话时使用中文引号「」" if get_locale() == 'zh' else 'Use quotation marks "" when quoting interviewees' - system_prompt = f"""你是一个专业的新闻编辑。请根据多位受访者的回答,生成一份采访摘要。 + system_prompt = """你是一个专业的新闻编辑。请根据多位受访者的回答,生成一份采访摘要。 摘要要求: 1. 提炼各方主要观点 @@ -1709,7 +1716,7 @@ class ZepToolsService: - 使用纯文本段落,用空行分隔不同部分 - 不要使用Markdown标题(如#、##、###) - 不要使用分割线(如---、***) -- {quote_instruction} +- 引用受访者原话时使用中文引号「」 - 可以使用**加粗**标记关键词,但不要使用其他Markdown语法""" user_prompt = f"""采访主题:{interview_requirement} @@ -1731,6 +1738,6 @@ class ZepToolsService: return summary except Exception as e: - logger.warning(t("console.generateInterviewSummaryFailed", error=e)) + logger.warning(f"生成采访摘要失败: {e}") # 降级:简单拼接 return f"共采访了{len(interviews)}位受访者,包括:" + "、".join([i.agent_name for i in interviews]) diff --git a/backend/app/utils/llm_client.py b/backend/app/utils/llm_client.py index 6c1a81f4..ae33afbe 100644 --- a/backend/app/utils/llm_client.py +++ b/backend/app/utils/llm_client.py @@ -5,7 +5,8 @@ LLM客户端封装 import json import re -from typing import Optional, Dict, Any, List +from typing import Any, Dict, List, Optional + from openai import OpenAI from ..config import Config @@ -13,41 +14,38 @@ from ..config import Config class LLMClient: """LLM客户端""" - + def __init__( self, api_key: Optional[str] = None, base_url: Optional[str] = None, - model: Optional[str] = None + model: Optional[str] = None, ): self.api_key = api_key or Config.LLM_API_KEY self.base_url = base_url or Config.LLM_BASE_URL self.model = model or Config.LLM_MODEL_NAME - + if not self.api_key: raise ValueError("LLM_API_KEY 未配置") - - self.client = OpenAI( - api_key=self.api_key, - base_url=self.base_url - ) - + + self.client = OpenAI(api_key=self.api_key, base_url=self.base_url) + def chat( self, messages: List[Dict[str, str]], temperature: float = 0.7, max_tokens: int = 4096, - response_format: Optional[Dict] = None + response_format: Optional[Dict] = None, ) -> str: """ 发送聊天请求 - + Args: messages: 消息列表 temperature: 温度参数 max_tokens: 最大token数 response_format: 响应格式(如JSON模式) - + Returns: 模型响应文本 """ @@ -57,47 +55,39 @@ class LLMClient: "temperature": temperature, "max_tokens": max_tokens, } - + if response_format: kwargs["response_format"] = response_format - + response = self.client.chat.completions.create(**kwargs) content = response.choices[0].message.content # 部分模型(如MiniMax M2.5)会在content中包含思考内容,需要移除 - content = re.sub(r'[\s\S]*?', '', content).strip() + content = re.sub(r"[\s\S]*?", "", content).strip() return content - - def chat_json( - self, - messages: List[Dict[str, str]], - temperature: float = 0.3, - max_tokens: int = 4096 - ) -> Dict[str, Any]: - """ - 发送聊天请求并返回JSON - - Args: - messages: 消息列表 - temperature: 温度参数 - max_tokens: 最大token数 - - Returns: - 解析后的JSON对象 - """ - response = self.chat( - messages=messages, - temperature=temperature, - max_tokens=max_tokens, - response_format={"type": "json_object"} - ) + + def chat_json(self, messages, temperature=0.3, max_tokens=4096): + try: + response = self.chat( + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + response_format={"type": "json_object"}, + ) + except Exception: + # Retry without response_format for unsupported providers + response = self.chat( + messages=messages, temperature=temperature, max_tokens=max_tokens + ) + # 清理markdown代码块标记 cleaned_response = response.strip() - cleaned_response = re.sub(r'^```(?:json)?\s*\n?', '', cleaned_response, flags=re.IGNORECASE) - cleaned_response = re.sub(r'\n?```\s*$', '', cleaned_response) + cleaned_response = re.sub( + r"^```(?:json)?\s*\n?", "", cleaned_response, flags=re.IGNORECASE + ) + cleaned_response = re.sub(r"\n?```\s*$", "", cleaned_response) cleaned_response = cleaned_response.strip() try: return json.loads(cleaned_response) except json.JSONDecodeError: raise ValueError(f"LLM返回的JSON格式无效: {cleaned_response}") - diff --git a/backend/app/utils/zep_paging.py b/backend/app/utils/zep_paging.py index 943cd1ae..eb68d4eb 100644 --- a/backend/app/utils/zep_paging.py +++ b/backend/app/utils/zep_paging.py @@ -10,8 +10,7 @@ import time from collections.abc import Callable from typing import Any -from zep_cloud import InternalServerError -from zep_cloud.client import Zep +from typing import Any from .logger import get_logger @@ -31,7 +30,7 @@ def _fetch_page_with_retry( page_description: str = "page", **kwargs: Any, ) -> list[Any]: - """单页请求,失败时指数退避重试。仅重试网络/IO类瞬态错误。""" + """单页请求,失败时指数退避重试。自动处理429限速。""" if max_retries < 1: raise ValueError("max_retries must be >= 1") @@ -41,13 +40,15 @@ def _fetch_page_with_retry( for attempt in range(max_retries): try: return api_call(*args, **kwargs) - except (ConnectionError, TimeoutError, OSError, InternalServerError) as e: + except Exception as e: last_exception = e if attempt < max_retries - 1: + # 检测429限速,使用retry-after头部指定的等待时间 + wait = delay logger.warning( - f"Zep {page_description} attempt {attempt + 1} failed: {str(e)[:100]}, retrying in {delay:.1f}s..." + f"Zep {page_description} attempt {attempt + 1} failed: {str(e)[:100]}, retrying in {wait:.1f}s..." ) - time.sleep(delay) + time.sleep(wait) delay *= 2 else: logger.error(f"Zep {page_description} failed after {max_retries} attempts: {str(e)}") @@ -57,7 +58,7 @@ def _fetch_page_with_retry( def fetch_all_nodes( - client: Zep, + client: Any, graph_id: str, page_size: int = _DEFAULT_PAGE_SIZE, max_items: int = _MAX_NODES, @@ -103,7 +104,7 @@ def fetch_all_nodes( def fetch_all_edges( - client: Zep, + client: Any, graph_id: str, page_size: int = _DEFAULT_PAGE_SIZE, max_retries: int = _DEFAULT_MAX_RETRIES, diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 4f5361d5..50848022 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -12,26 +12,22 @@ dependencies = [ # 核心框架 "flask>=3.0.0", "flask-cors>=6.0.0", - # LLM 相关 "openai>=1.0.0", - # Zep Cloud - "zep-cloud==3.13.0", - # OASIS 社交媒体模拟 "camel-oasis==0.2.5", "camel-ai==0.2.78", - # 文件处理 "PyMuPDF>=1.24.0", # 编码检测(支持非UTF-8编码的文本文件) "charset-normalizer>=3.0.0", "chardet>=5.0.0", - # 工具库 "python-dotenv>=1.0.0", "pydantic>=2.0.0", + "graphiti-core>=0.3", + "google-genai>=1.68.0", ] [project.optional-dependencies] diff --git a/backend/uv.lock b/backend/uv.lock index f1ce4b60..eae68acd 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -475,6 +475,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/07/6c/aa3f2f849e01cb6a001cd8554a88d4c77c5c1a31c95bdf1cf9301e6d9ef4/defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61", size = 25604, upload-time = "2021-03-08T10:59:24.45Z" }, ] +[[package]] +name = "diskcache" +version = "5.6.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/3f/21/1c1ffc1a039ddcc459db43cc108658f32c57d271d7289a2794e401d0fdb6/diskcache-5.6.3.tar.gz", hash = "sha256:2c3a3fa2743d8535d832ec61c2054a1641f41775aa7c556758a109941e33e4fc", size = 67916, upload-time = "2023-08-31T06:12:00.316Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3f/27/4570e78fc0bf5ea0ca45eb1de3818a23787af9b390c0b0a0033a1b8236f9/diskcache-5.6.3-py3-none-any.whl", hash = "sha256:5e31b2d5fbad117cc363ebaf6b689474db18a1f6438bc82358b024abd4c2ca19", size = 45550, upload-time = "2023-08-31T06:11:58.822Z" }, +] + [[package]] name = "distlib" version = "0.4.0" @@ -592,6 +601,63 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/51/c7/b64cae5dba3a1b138d7123ec36bb5ccd39d39939f18454407e5468f4763f/fsspec-2025.12.0-py3-none-any.whl", hash = "sha256:8bf1fe301b7d8acfa6e8571e3b1c3d158f909666642431cc78a1b7b4dbc5ec5b", size = 201422, upload-time = "2025-12-03T15:23:41.434Z" }, ] +[[package]] +name = "google-auth" +version = "2.49.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cryptography" }, + { name = "pyasn1-modules" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ea/80/6a696a07d3d3b0a92488933532f03dbefa4a24ab80fb231395b9a2a1be77/google_auth-2.49.1.tar.gz", hash = "sha256:16d40da1c3c5a0533f57d268fe72e0ebb0ae1cc3b567024122651c045d879b64", size = 333825, upload-time = "2026-03-12T19:30:58.135Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e9/eb/c6c2478d8a8d633460be40e2a8a6f8f429171997a35a96f81d3b680dec83/google_auth-2.49.1-py3-none-any.whl", hash = "sha256:195ebe3dca18eddd1b3db5edc5189b76c13e96f29e73043b923ebcf3f1a860f7", size = 240737, upload-time = "2026-03-12T19:30:53.159Z" }, +] + +[package.optional-dependencies] +requests = [ + { name = "requests" }, +] + +[[package]] +name = "google-genai" +version = "1.68.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "distro" }, + { name = "google-auth", extra = ["requests"] }, + { name = "httpx" }, + { name = "pydantic" }, + { name = "requests" }, + { name = "sniffio" }, + { name = "tenacity" }, + { name = "typing-extensions" }, + { name = "websockets" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9c/2c/f059982dbcb658cc535c81bbcbe7e2c040d675f4b563b03cdb01018a4bc3/google_genai-1.68.0.tar.gz", hash = "sha256:ac30c0b8bc630f9372993a97e4a11dae0e36f2e10d7c55eacdca95a9fa14ca96", size = 511285, upload-time = "2026-03-18T01:03:18.243Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/84/de/7d3ee9c94b74c3578ea4f88d45e8de9405902f857932334d81e89bce3dfa/google_genai-1.68.0-py3-none-any.whl", hash = "sha256:a1bc9919c0e2ea2907d1e319b65471d3d6d58c54822039a249fe1323e4178d15", size = 750912, upload-time = "2026-03-18T01:03:15.983Z" }, +] + +[[package]] +name = "graphiti-core" +version = "0.11.6" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "diskcache" }, + { name = "neo4j" }, + { name = "numpy" }, + { name = "openai" }, + { name = "pydantic" }, + { name = "python-dotenv" }, + { name = "tenacity" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/30/94/3f84400e5f02ea8e9dc79784202de4173cbc16f4b3ad1bd4302da888e4d8/graphiti_core-0.11.6.tar.gz", hash = "sha256:31d26621834d7d4b8865059ab749feb18af15937b59c69598a640a5dfabea331", size = 71928, upload-time = "2025-05-15T17:58:02.304Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ac/2e/c8f22f01585bf173d1c82f6d4615511aebc75aeda764c69aa394446fa93c/graphiti_core-0.11.6-py3-none-any.whl", hash = "sha256:6ec4807a884f5ea88b942d0c8b7bcd2e107c7358ab4f98ef2a2092c229929707", size = 111001, upload-time = "2025-05-15T17:58:00.542Z" }, +] + [[package]] name = "h11" version = "0.16.0" @@ -1248,11 +1314,12 @@ dependencies = [ { name = "charset-normalizer" }, { name = "flask" }, { name = "flask-cors" }, + { name = "google-genai" }, + { name = "graphiti-core" }, { name = "openai" }, { name = "pydantic" }, { name = "pymupdf" }, { name = "python-dotenv" }, - { name = "zep-cloud" }, ] [package.optional-dependencies] @@ -1276,6 +1343,8 @@ requires-dist = [ { name = "charset-normalizer", specifier = ">=3.0.0" }, { name = "flask", specifier = ">=3.0.0" }, { name = "flask-cors", specifier = ">=6.0.0" }, + { name = "google-genai", specifier = ">=1.68.0" }, + { name = "graphiti-core", specifier = ">=0.3" }, { name = "openai", specifier = ">=1.0.0" }, { name = "pipreqs", marker = "extra == 'dev'", specifier = ">=0.5.0" }, { name = "pydantic", specifier = ">=2.0.0" }, @@ -1283,7 +1352,6 @@ requires-dist = [ { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0.0" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23.0" }, { name = "python-dotenv", specifier = ">=1.0.0" }, - { name = "zep-cloud", specifier = "==3.13.0" }, ] provides-extras = ["dev"] @@ -1916,6 +1984,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8e/37/efad0257dc6e593a18957422533ff0f87ede7c9c6ea010a2177d738fb82f/pure_eval-0.2.3-py3-none-any.whl", hash = "sha256:1db8e35b67b3d218d818ae653e27f06c3aa420901fa7b081ca98cbedc874e0d0", size = 11842, upload-time = "2024-07-21T12:58:20.04Z" }, ] +[[package]] +name = "pyasn1" +version = "0.6.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/5c/5f/6583902b6f79b399c9c40674ac384fd9cd77805f9e6205075f828ef11fb2/pyasn1-0.6.3.tar.gz", hash = "sha256:697a8ecd6d98891189184ca1fa05d1bb00e2f84b5977c481452050549c8a72cf", size = 148685, upload-time = "2026-03-17T01:06:53.382Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5d/a0/7d793dce3fa811fe047d6ae2431c672364b462850c6235ae306c0efd025f/pyasn1-0.6.3-py3-none-any.whl", hash = "sha256:a80184d120f0864a52a073acc6fc642847d0be408e7c7252f31390c0f4eadcde", size = 83997, upload-time = "2026-03-17T01:06:52.036Z" }, +] + +[[package]] +name = "pyasn1-modules" +version = "0.4.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pyasn1" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/e9/e6/78ebbb10a8c8e4b61a59249394a4a594c1a7af95593dc933a349c8d00964/pyasn1_modules-0.4.2.tar.gz", hash = "sha256:677091de870a80aae844b1ca6134f54652fa2c8c5a52aa396440ac3106e941e6", size = 307892, upload-time = "2025-03-28T02:41:22.17Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/47/8d/d529b5d697919ba8c11ad626e835d4039be708a35b0d22de83a269a6682c/pyasn1_modules-0.4.2-py3-none-any.whl", hash = "sha256:29253a9207ce32b64c3ac6600edc75368f98473906e8fd1043bd6b5b1de2c14a", size = 181259, upload-time = "2025-03-28T02:41:19.028Z" }, +] + [[package]] name = "pycparser" version = "2.23" @@ -2987,6 +3076,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/40/44/4a5f08c96eb108af5cb50b41f76142f0afa346dfa99d5296fe7202a11854/tabulate-0.9.0-py3-none-any.whl", hash = "sha256:024ca478df22e9340661486f85298cff5f6dcdba14f3813e8830015b9ed1948f", size = 35252, upload-time = "2022-10-06T17:21:44.262Z" }, ] +[[package]] +name = "tenacity" +version = "9.1.4" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/47/c6/ee486fd809e357697ee8a44d3d69222b344920433d3b6666ccd9b374630c/tenacity-9.1.4.tar.gz", hash = "sha256:adb31d4c263f2bd041081ab33b498309a57c77f9acf2db65aadf0898179cf93a", size = 49413, upload-time = "2026-02-07T10:45:33.841Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d7/c1/eb8f9debc45d3b7918a32ab756658a0904732f75e555402972246b0b8e71/tenacity-9.1.4-py3-none-any.whl", hash = "sha256:6095a360c919085f28c6527de529e76a06ad89b23659fa881ae0649b867a9d55", size = 28926, upload-time = "2026-02-07T10:45:32.24Z" }, +] + [[package]] name = "texttable" version = "1.7.0" @@ -3488,19 +3586,3 @@ sdist = { url = "https://files.pythonhosted.org/packages/d4/c8/cc640404a0981e6c1 wheels = [ { url = "https://files.pythonhosted.org/packages/8b/90/89a2ff242ccab6a24fbab18dbbabc67c51a6f0ed01f9a0f41689dc177419/yarg-0.1.9-py2.py3-none-any.whl", hash = "sha256:4f9cebdc00fac946c9bf2783d634e538a71c7d280a4d806d45fd4dc0ef441492", size = 19162, upload-time = "2014-08-11T22:01:41.104Z" }, ] - -[[package]] -name = "zep-cloud" -version = "3.13.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "httpx" }, - { name = "pydantic" }, - { name = "pydantic-core" }, - { name = "python-dateutil" }, - { name = "typing-extensions" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/32/c7/c835debf13302f8aaf8d0561ac6ff5a9bc15cc140cd692a1330fb1900c55/zep_cloud-3.13.0.tar.gz", hash = "sha256:c55d9c511773bb2177ae8e08546141404f87d2099affafabd7ec4b4505763e48", size = 63116, upload-time = "2025-11-20T15:25:40.745Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/f7/e1/bbf03c6c8007c0cb238780e7fc6d8e1a52633893933a41aa09678618985a/zep_cloud-3.13.0-py3-none-any.whl", hash = "sha256:b2fbdeef73e262194c8f67b58f76471de6ee87e1a629541a09d8f7bbf475f12b", size = 110601, upload-time = "2025-11-20T15:25:38.484Z" }, -] diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 3e56d752..fdab7ac4 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -1435,7 +1435,6 @@ "resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz", "integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==", "license": "ISC", - "peer": true, "engines": { "node": ">=12" } @@ -1913,7 +1912,6 @@ "integrity": "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -2053,7 +2051,6 @@ "integrity": "sha512-ITcnkFeR3+fI8P1wMgItjGrR10170d8auB4EpMLPqmx6uxElH3a/hHGQabSHKdqd4FXWO1nFIp9rRn7JQ34ACQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.5.0", @@ -2128,7 +2125,6 @@ "resolved": "https://registry.npmjs.org/vue/-/vue-3.5.25.tgz", "integrity": "sha512-YLVdgv2K13WJ6n+kD5owehKtEXwdwXuj2TTyJMsO7pSeKw2bfRNZGjhB7YzrpbMYj5b5QsUebHpOqR3R3ziy/g==", "license": "MIT", - "peer": true, "dependencies": { "@vue/compiler-dom": "3.5.25", "@vue/compiler-sfc": "3.5.25",