Merge remote-tracking branch 'abhiyadav2345/feat/graphiti-neo4j-migration'

This commit is contained in:
stg 2026-05-05 15:03:47 +02:00
commit 62648289d1
18 changed files with 1395 additions and 427 deletions

86
CLAUDE.md Normal file
View File

@ -0,0 +1,86 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
MiroFish is a multi-agent swarm intelligence prediction engine. It builds knowledge graphs from seed data, simulates thousands of AI agents interacting on virtual Twitter/Reddit platforms (via CAMEL-OASIS), and generates analytical reports — all to predict outcomes of real-world scenarios.
## Commands
### Setup
```bash
npm run setup:all # Install all dependencies (frontend + backend)
npm run setup # Frontend npm install only
npm run setup:backend # Backend: uv sync (Python deps)
```
### Development
```bash
npm run dev # Run backend + frontend concurrently
npm run backend # Backend only: Flask on port 5001
npm run frontend # Frontend only: Vite on port 3000
```
### Build
```bash
npm run build # Build frontend (Vite)
```
### Backend (Python)
```bash
cd backend && uv run python run.py # Start Flask server
cd backend && uv run python -m pytest # Run tests (if any)
```
### Docker
```bash
docker-compose up # Full stack via Docker
```
## Architecture
### Stack
- **Backend**: Python ≥3.11 Flask 3.0, managed by `uv`
- **Frontend**: Vue 3 + Vite, port 3000; proxies `/api` → port 5001
- **LLM**: OpenAI SDK-compatible (default: Qwen via `dashscope`; also works with GLM, OpenAI)
- **Memory/Graph**: Zep Cloud (knowledge graph for entity storage and retrieval)
- **Simulation**: CAMEL-OASIS (multi-agent Twitter + Reddit simulation)
- **Visualization**: D3.js
### Required Environment Variables
Copy `.env.example` to `.env`:
```
LLM_API_KEY # Required
LLM_BASE_URL # Default: https://dashscope.aliyuncs.com/compatible-mode/v1
LLM_MODEL_NAME # Default: qwen-plus
ZEP_API_KEY # Required (Zep Cloud)
```
### 5-Step Pipeline
The core workflow is a sequential async pipeline:
1. **Graph Build** — Upload files → LLM extracts ontology → Zep Cloud builds knowledge graph
2. **Env Setup** — Read Zep entities → Generate OASIS agent profiles (AI personalities)
3. **Simulation** — CAMEL-OASIS runs agents on dual platforms (Twitter + Reddit) in parallel
4. **Report** — ReportAgent (ReACT loop) queries graph with tools: `SearchResult`, `InsightForge`, `Panorama`, `Interview`
5. **Interaction** — Chat with simulated agents or the ReportAgent
### Backend Structure (`backend/app/`)
- `api/` — Flask blueprints: `graph_bp`, `simulation_bp`, `report_bp`
- `services/` — Core logic: graph building, simulation runner, report agent, Zep tools
- `models/``Project` and `Task` state objects (in-memory, JSON-serializable)
- `utils/` — LLM client wrapper, file parser, retry logic, Zep pagination
- `config/config.py` — All configuration (LLM, Zep, chunking, simulation params)
Long-running operations (ontology generation, graph build, profile generation, report generation) run as background tasks tracked via `Task` objects with progress polling.
### Frontend Structure (`frontend/src/`)
- `views/` — Page components mapped to routes; `Process.vue` is the main 50KB workflow orchestrator
- `components/``Step1-5` step components + `GraphPanel.vue` (D3 graph visualization)
- `api/` — Axios services (`graph.js`, `simulation.js`, `report.js`) with 5-min timeout and exponential retry
### Key Implementation Details
- Reasoning model outputs (e.g., MiniMax/GLM with `<think>` tags or markdown code fences) are stripped before processing — see recent fix in commit `985f89f`
- Simulation state is managed in `SimulationManager`; IPC between processes via `simulation_ipc.py`
- Interview/chat with agents uses prefix injection to suppress tool calls in responses
- Default simulation: max 10 rounds, Twitter actions include CREATE_POST/LIKE/REPOST/FOLLOW/QUOTE/DO_NOTHING; Reddit adds CREATE_COMMENT/DISLIKE

227
README-EN.md Normal file
View File

@ -0,0 +1,227 @@
<div align="center">
<img src="./static/image/MiroFish_logo_compressed.jpeg" alt="MiroFish Logo" width="75%"/>
<a href="https://trendshift.io/repositories/16144" target="_blank"><img src="https://trendshift.io/api/badge/repositories/16144" alt="666ghj%2FMiroFish | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/></a>
简洁通用的群体智能引擎,预测万物
</br>
<em>A Simple and Universal Swarm Intelligence Engine, Predicting Anything</em>
<a href="https://www.shanda.com/" target="_blank"><img src="./static/image/shanda_logo.png" alt="666ghj%2MiroFish | Shanda" height="40"/></a>
[![GitHub Stars](https://img.shields.io/github/stars/666ghj/MiroFish?style=flat-square&color=DAA520)](https://github.com/666ghj/MiroFish/stargazers)
[![GitHub Watchers](https://img.shields.io/github/watchers/666ghj/MiroFish?style=flat-square)](https://github.com/666ghj/MiroFish/watchers)
[![GitHub Forks](https://img.shields.io/github/forks/666ghj/MiroFish?style=flat-square)](https://github.com/666ghj/MiroFish/network)
[![Docker](https://img.shields.io/badge/Docker-Build-2496ED?style=flat-square&logo=docker&logoColor=white)](https://hub.docker.com/)
[![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/666ghj/MiroFish)
[![Discord](https://img.shields.io/badge/Discord-Join-5865F2?style=flat-square&logo=discord&logoColor=white)](http://discord.gg/ePf5aPaHnA)
[![X](https://img.shields.io/badge/X-Follow-000000?style=flat-square&logo=x&logoColor=white)](https://x.com/mirofish_ai)
[![Instagram](https://img.shields.io/badge/Instagram-Follow-E4405F?style=flat-square&logo=instagram&logoColor=white)](https://www.instagram.com/mirofish_ai/)
[English](./README-EN.md) | [中文文档](./README.md)
</div>
## ⚡ Overview
**MiroFish** is a next-generation AI prediction engine powered by multi-agent technology. By extracting seed information from the real world (such as breaking news, policy drafts, or financial signals), it automatically constructs a high-fidelity parallel digital world. Within this space, thousands of intelligent agents with independent personalities, long-term memory, and behavioral logic freely interact and undergo social evolution. You can inject variables dynamically from a "God's-eye view" to precisely deduce future trajectories — **rehearse the future in a digital sandbox, and win decisions after countless simulations**.
> You only need to: Upload seed materials (data analysis reports or interesting novel stories) and describe your prediction requirements in natural language</br>
> MiroFish will return: A detailed prediction report and a deeply interactive high-fidelity digital world
### Our Vision
MiroFish is dedicated to creating a swarm intelligence mirror that maps reality. By capturing the collective emergence triggered by individual interactions, we break through the limitations of traditional prediction:
- **At the Macro Level**: We are a rehearsal laboratory for decision-makers, allowing policies and public relations to be tested at zero risk
- **At the Micro Level**: We are a creative sandbox for individual users — whether deducing novel endings or exploring imaginative scenarios, everything can be fun, playful, and accessible
From serious predictions to playful simulations, we let every "what if" see its outcome, making it possible to predict anything.
## 🌐 Live Demo
Welcome to visit our online demo environment and experience a prediction simulation on trending public opinion events we've prepared for you: [mirofish-live-demo](https://666ghj.github.io/mirofish-demo/)
## 📸 Screenshots
<div align="center">
<table>
<tr>
<td><img src="./static/image/Screenshot/运行截图1.png" alt="Screenshot 1" width="100%"/></td>
<td><img src="./static/image/Screenshot/运行截图2.png" alt="Screenshot 2" width="100%"/></td>
</tr>
<tr>
<td><img src="./static/image/Screenshot/运行截图3.png" alt="Screenshot 3" width="100%"/></td>
<td><img src="./static/image/Screenshot/运行截图4.png" alt="Screenshot 4" width="100%"/></td>
</tr>
<tr>
<td><img src="./static/image/Screenshot/运行截图5.png" alt="Screenshot 5" width="100%"/></td>
<td><img src="./static/image/Screenshot/运行截图6.png" alt="Screenshot 6" width="100%"/></td>
</tr>
</table>
</div>
## 🎬 Demo Videos
### 1. Wuhan University Public Opinion Simulation + MiroFish Project Introduction
<div align="center">
<a href="https://www.bilibili.com/video/BV1VYBsBHEMY/" target="_blank"><img src="./static/image/武大模拟演示封面.png" alt="MiroFish Demo Video" width="75%"/></a>
Click the image to watch the complete demo video for prediction using BettaFish-generated "Wuhan University Public Opinion Report"
</div>
### 2. Dream of the Red Chamber Lost Ending Simulation
<div align="center">
<a href="https://www.bilibili.com/video/BV1cPk3BBExq" target="_blank"><img src="./static/image/红楼梦模拟推演封面.jpg" alt="MiroFish Demo Video" width="75%"/></a>
Click the image to watch MiroFish's deep prediction of the lost ending based on hundreds of thousands of words from the first 80 chapters of "Dream of the Red Chamber"
</div>
> **Financial Prediction**, **Political News Prediction** and more examples coming soon...
## 🔄 Workflow
1. **Graph Building**: Seed extraction & Individual/collective memory injection & GraphRAG construction
2. **Environment Setup**: Entity relationship extraction & Persona generation & Agent configuration injection
3. **Simulation**: Dual-platform parallel simulation & Auto-parse prediction requirements & Dynamic temporal memory updates
4. **Report Generation**: ReportAgent with rich toolset for deep interaction with post-simulation environment
5. **Deep Interaction**: Chat with any agent in the simulated world & Interact with ReportAgent
## 🚀 Quick Start
### Option 1: Source Code Deployment (Recommended)
#### Prerequisites
| Tool | Version | Description | Check Installation |
|------|---------|-------------|-------------------|
| **Node.js** | 18+ | Frontend runtime, includes npm | `node -v` |
| **Python** | ≥3.11, ≤3.12 | Backend runtime | `python --version` |
| **uv** | Latest | Python package manager | `uv --version` |
| **Neo4j** | 5.x Community | Local knowledge graph database | `neo4j --version` |
**Install Neo4j (choose one):**
```bash
# macOS
brew install neo4j
# Linux (Debian/Ubuntu)
# See official docs: https://neo4j.com/docs/operations-manual/current/installation/linux/
# Windows / All platforms
# Download Neo4j Desktop: https://neo4j.com/download/
# Set password before first start, then launch
neo4j-admin dbms set-initial-password your_neo4j_password
neo4j start
```
#### 1. Configure Environment Variables
```bash
# Copy the example configuration file
cp .env.example .env
# Edit the .env file and fill in the required API keys
```
**Required Environment Variables:**
```env
# LLM API Configuration (supports any LLM API with OpenAI SDK format)
# Recommended: Alibaba Qwen-plus model via Bailian Platform: https://bailian.console.aliyun.com/
# High consumption, try simulations with fewer than 40 rounds first
LLM_API_KEY=your_api_key
LLM_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1
LLM_MODEL_NAME=qwen-plus
# Knowledge Graph — local Neo4j + Graphiti (free, no rate limits)
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=your_neo4j_password
# Embedding model (uncomment if using Gemini API)
# EMBEDDING_MODEL=gemini-embedding-001
```
> **Note:** MiroFish has migrated from Zep Cloud to local **Graphiti + Neo4j**. No third-party account required — completely free with no rate limits.
#### 2. Install Dependencies
```bash
# One-click installation of all dependencies (root + frontend + backend)
npm run setup:all
```
Or install step by step:
```bash
# Install Node dependencies (root + frontend)
npm run setup
# Install Python dependencies (backend, auto-creates virtual environment)
npm run setup:backend
```
#### 3. Start Services
```bash
# Start both frontend and backend (run from project root)
npm run dev
```
**Service URLs:**
- Frontend: `http://localhost:3000`
- Backend API: `http://localhost:5001`
**Start Individually:**
```bash
npm run backend # Start backend only
npm run frontend # Start frontend only
```
### Option 2: Docker Deployment
```bash
# 1. Configure environment variables (same as source deployment)
cp .env.example .env
# 2. Pull image and start
docker compose up -d
```
Reads `.env` from root directory by default, maps ports `3000 (frontend) / 5001 (backend)`
> Mirror address for faster pulling is provided as comments in `docker-compose.yml`, replace if needed.
## 📬 Join the Conversation
<div align="center">
<img src="./static/image/QQ群.png" alt="QQ Group" width="60%"/>
</div>
&nbsp;
The MiroFish team is recruiting full-time/internship positions. If you're interested in multi-agent simulation and LLM applications, feel free to send your resume to: **mirofish@shanda.com**
## 📄 Acknowledgments
**MiroFish has received strategic support and incubation from Shanda Group!**
MiroFish's simulation engine is powered by **[OASIS (Open Agent Social Interaction Simulations)](https://github.com/camel-ai/oasis)**, We sincerely thank the CAMEL-AI team for their open-source contributions!
## 📈 Project Statistics
<a href="https://www.star-history.com/#666ghj/MiroFish&type=date&legend=top-left">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://api.star-history.com/svg?repos=666ghj/MiroFish&type=date&theme=dark&legend=top-left" />
<source media="(prefers-color-scheme: light)" srcset="https://api.star-history.com/svg?repos=666ghj/MiroFish&type=date&legend=top-left" />
<img alt="Star History Chart" src="https://api.star-history.com/svg?repos=666ghj/MiroFish&type=date&legend=top-left" />
</picture>
</a>

View File

@ -97,11 +97,37 @@ Click the image to watch MiroFish's deep prediction of the lost ending based on
#### Prerequisites
<<<<<<< HEAD
| Tool | Version | Description | Check Installation |
|------|---------|-------------|-------------------|
| **Node.js** | 18+ | Frontend runtime, includes npm | `node -v` |
| **Python** | ≥3.11, ≤3.12 | Backend runtime | `python --version` |
| **uv** | Latest | Python package manager | `uv --version` |
=======
| 工具 | 版本要求 | 说明 | 安装检查 |
|------|---------|------|---------|
| **Node.js** | 18+ | 前端运行环境,包含 npm | `node -v` |
| **Python** | ≥3.11, ≤3.12 | 后端运行环境 | `python --version` |
| **uv** | 最新版 | Python 包管理器 | `uv --version` |
| **Neo4j** | 5.x Community | 本地知识图谱数据库 | `neo4j --version` |
**安装 Neo4j选择适合你的方式**
```bash
# macOS
brew install neo4j
# Linux (Debian/Ubuntu)
# 参考官方文档https://neo4j.com/docs/operations-manual/current/installation/linux/
# Windows / 所有平台
# 下载 Desktop 版本https://neo4j.com/download/
# 首次启动前设置密码,然后启动服务
neo4j-admin dbms set-initial-password your_neo4j_password
neo4j start
```
>>>>>>> abhiyadav2345/feat/graphiti-neo4j-migration
#### 1. Configure Environment Variables
@ -122,12 +148,29 @@ LLM_API_KEY=your_api_key
LLM_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1
LLM_MODEL_NAME=qwen-plus
<<<<<<< HEAD
# Zep Cloud Configuration
# Free monthly quota is sufficient for simple usage: https://app.getzep.com/
ZEP_API_KEY=your_zep_api_key
```
#### 2. Install Dependencies
=======
# 知识图谱配置(本地 Neo4j + Graphiti免费无限制
# 安装 Neo4j Community Editionhttps://neo4j.com/download/
# macOS 用户brew install neo4j && neo4j start
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=your_neo4j_password
# Embedding 模型(使用 Gemini API 时取消注释)
# EMBEDDING_MODEL=gemini-embedding-001
```
> **注意:** MiroFish 已从 Zep Cloud 迁移至本地 **Graphiti + Neo4j**,无需注册任何第三方服务,完全免费且无速率限制。
#### 2. 安装依赖
>>>>>>> abhiyadav2345/feat/graphiti-neo4j-migration
```bash
# One-click installation of all dependencies (root + frontend + backend)

View File

@ -73,8 +73,36 @@ def create_app(config_class=Config):
def health():
return {'status': 'ok', 'service': 'MiroFish Backend'}
# On startup: recover any projects stuck in graph_building (task was killed by restart)
if should_log_startup:
_recover_stuck_projects()
if should_log_startup:
logger.info("MiroFish Backend 启动完成")
return app
def _recover_stuck_projects():
"""Mark graph_building projects as completed if Neo4j already has their data."""
from .models.project import ProjectManager, ProjectStatus
from .utils.logger import get_logger as _get_logger
_log = _get_logger('mirofish.startup')
try:
for p in ProjectManager.list_projects():
if p.status == ProjectStatus.GRAPH_BUILDING and p.graph_id:
from .services.graphiti_adapter import _get_graphiti, _run, _neo4j_query
g = _get_graphiti()
r = _run(_neo4j_query(g,
'MATCH (n:Entity {group_id: $gid}) RETURN count(n) AS n',
{'gid': p.graph_id}
))
node_count = int(r[0]['n']) if r else 0
if node_count > 0:
p.status = ProjectStatus.GRAPH_COMPLETED
p.graph_build_task_id = None
ProjectManager.save_project(p)
_log.info(f"Recovered stuck project {p.project_id}: {node_count} nodes found, marked graph_completed")
except Exception as e:
_get_logger('mirofish.startup').warning(f"Startup recovery failed: {e}")

View File

@ -4,6 +4,7 @@
"""
import os
import time
import traceback
import threading
from flask import request, jsonify
@ -15,10 +16,15 @@ from ..services.graph_builder import GraphBuilderService
from ..services.text_processor import TextProcessor
from ..utils.file_parser import FileParser
from ..utils.logger import get_logger
from ..utils.locale import t, get_locale, set_locale
from ..models.task import TaskManager, TaskStatus
from ..models.project import ProjectManager, ProjectStatus
# In-memory cache for graph data to avoid hammering Zep's rate-limited API.
# Stale cache is served instantly on 429; a background thread refreshes it.
_graph_data_cache: dict = {} # graph_id -> {"data": ..., "ts": float}
_graph_refresh_locks: dict = {} # graph_id -> threading.Lock (one refresh at a time)
_GRAPH_CACHE_TTL = 300 # seconds before triggering a background refresh
# 获取日志器
logger = get_logger('mirofish.api')
@ -43,9 +49,9 @@ def get_project(project_id: str):
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=project_id)
"error": f"项目不存在: {project_id}"
}), 404
return jsonify({
"success": True,
"data": project.to_dict()
@ -77,12 +83,12 @@ def delete_project(project_id: str):
if not success:
return jsonify({
"success": False,
"error": t('api.projectDeleteFailed', id=project_id)
"error": f"项目不存在或删除失败: {project_id}"
}), 404
return jsonify({
"success": True,
"message": t('api.projectDeleted', id=project_id)
"message": f"项目已删除: {project_id}"
})
@ -96,9 +102,9 @@ def reset_project(project_id: str):
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=project_id)
"error": f"项目不存在: {project_id}"
}), 404
# 重置到本体已生成状态
if project.ontology:
project.status = ProjectStatus.ONTOLOGY_GENERATED
@ -112,7 +118,7 @@ def reset_project(project_id: str):
return jsonify({
"success": True,
"message": t('api.projectReset', id=project_id),
"message": f"项目已重置: {project_id}",
"data": project.to_dict()
})
@ -161,7 +167,7 @@ def generate_ontology():
if not simulation_requirement:
return jsonify({
"success": False,
"error": t('api.requireSimulationRequirement')
"error": "请提供模拟需求描述 (simulation_requirement)"
}), 400
# 获取上传的文件
@ -169,7 +175,7 @@ def generate_ontology():
if not uploaded_files or all(not f.filename for f in uploaded_files):
return jsonify({
"success": False,
"error": t('api.requireFileUpload')
"error": "请至少上传一个文档文件"
}), 400
# 创建项目
@ -204,7 +210,7 @@ def generate_ontology():
ProjectManager.delete_project(project.project_id)
return jsonify({
"success": False,
"error": t('api.noDocProcessed')
"error": "没有成功处理任何文档,请检查文件格式"
}), 400
# 保存提取的文本
@ -285,13 +291,13 @@ def build_graph():
# 检查配置
errors = []
if not Config.ZEP_API_KEY:
errors.append(t('api.zepApiKeyMissing'))
if not Config.NEO4J_PASSWORD:
errors.append("NEO4J未配置")
if errors:
logger.error(f"配置错误: {errors}")
return jsonify({
"success": False,
"error": t('api.configError', details="; ".join(errors))
"error": "配置错误: " + "; ".join(errors)
}), 500
# 解析请求
@ -302,7 +308,7 @@ def build_graph():
if not project_id:
return jsonify({
"success": False,
"error": t('api.requireProjectId')
"error": "请提供 project_id"
}), 400
# 获取项目
@ -310,22 +316,22 @@ def build_graph():
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=project_id)
"error": f"项目不存在: {project_id}"
}), 404
# 检查项目状态
force = data.get('force', False) # 强制重新构建
if project.status == ProjectStatus.CREATED:
return jsonify({
"success": False,
"error": t('api.ontologyNotGenerated')
"error": "项目尚未生成本体,请先调用 /ontology/generate"
}), 400
if project.status == ProjectStatus.GRAPH_BUILDING and not force:
return jsonify({
"success": False,
"error": t('api.graphBuilding'),
"error": "图谱正在构建中,请勿重复提交。如需强制重建,请添加 force: true",
"task_id": project.graph_build_task_id
}), 400
@ -350,7 +356,7 @@ def build_graph():
if not text:
return jsonify({
"success": False,
"error": t('api.textNotFound')
"error": "未找到提取的文本内容"
}), 400
# 获取本体
@ -358,7 +364,7 @@ def build_graph():
if not ontology:
return jsonify({
"success": False,
"error": t('api.ontologyNotFound')
"error": "未找到本体定义"
}), 400
# 创建异步任务
@ -371,28 +377,24 @@ def build_graph():
project.graph_build_task_id = task_id
ProjectManager.save_project(project)
# Capture locale before spawning background thread
current_locale = get_locale()
# 启动后台任务
def build_task():
set_locale(current_locale)
build_logger = get_logger('mirofish.build')
try:
build_logger.info(f"[{task_id}] 开始构建图谱...")
task_manager.update_task(
task_id,
status=TaskStatus.PROCESSING,
message=t('progress.initGraphService')
message="初始化图谱构建服务..."
)
# 创建图谱构建服务
builder = GraphBuilderService(api_key=Config.ZEP_API_KEY)
builder = GraphBuilderService()
# 分块
task_manager.update_task(
task_id,
message=t('progress.textChunking'),
message="文本分块中...",
progress=5
)
chunks = TextProcessor.split_text(
@ -405,7 +407,7 @@ def build_graph():
# 创建图谱
task_manager.update_task(
task_id,
message=t('progress.creatingZepGraph'),
message="创建Zep图谱...",
progress=10
)
graph_id = builder.create_graph(name=graph_name)
@ -417,7 +419,7 @@ def build_graph():
# 设置本体
task_manager.update_task(
task_id,
message=t('progress.settingOntology'),
message="设置本体定义...",
progress=15
)
builder.set_ontology(graph_id, ontology)
@ -431,23 +433,36 @@ def build_graph():
progress=progress
)
task_manager.update_task(
task_id,
message=t('progress.addingChunks', count=total_chunks),
progress=15
)
# Count already-processed episodes to resume after a restart
from app.services.graphiti_adapter import _get_graphiti, _run, _neo4j_query
try:
g = _get_graphiti()
ep_count = _run(_neo4j_query(g,
'MATCH (e:Episodic {group_id: $gid}) RETURN count(e) AS n',
{'gid': graph_id}
))
already_done = int(ep_count[0]['n']) if ep_count else 0
except Exception:
already_done = 0
skip_chunks = already_done
remaining = total_chunks - skip_chunks
msg_start = (f"断点续传:跳过 {skip_chunks} 个已处理块,继续处理 {remaining} 块..."
if skip_chunks > 0 else f"开始添加 {total_chunks} 个文本块...")
task_manager.update_task(task_id, message=msg_start, progress=15)
episode_uuids = builder.add_text_batches(
graph_id,
graph_id,
chunks,
batch_size=3,
progress_callback=add_progress_callback
progress_callback=add_progress_callback,
skip_chunks=skip_chunks,
)
# 等待Zep处理完成查询每个episode的processed状态
task_manager.update_task(
task_id,
message=t('progress.waitingZepProcess'),
message="等待Zep处理数据...",
progress=55
)
@ -464,7 +479,7 @@ def build_graph():
# 获取图谱数据
task_manager.update_task(
task_id,
message=t('progress.fetchingGraphData'),
message="获取图谱数据...",
progress=95
)
graph_data = builder.get_graph_data(graph_id)
@ -481,7 +496,7 @@ def build_graph():
task_manager.update_task(
task_id,
status=TaskStatus.COMPLETED,
message=t('progress.graphBuildComplete'),
message="图谱构建完成",
progress=100,
result={
"project_id": project_id,
@ -504,7 +519,7 @@ def build_graph():
task_manager.update_task(
task_id,
status=TaskStatus.FAILED,
message=t('progress.buildFailed', error=str(e)),
message=f"构建失败: {str(e)}",
error=traceback.format_exc()
)
@ -517,7 +532,7 @@ def build_graph():
"data": {
"project_id": project_id,
"task_id": task_id,
"message": t('api.graphBuildStarted', taskId=task_id)
"message": "图谱构建任务已启动,请通过 /task/{task_id} 查询进度"
}
})
@ -541,7 +556,7 @@ def get_task(task_id: str):
if not task:
return jsonify({
"success": False,
"error": t('api.taskNotFound', id=task_id)
"error": f"任务不存在: {task_id}"
}), 404
return jsonify({
@ -566,32 +581,59 @@ def list_tasks():
# ============== 图谱数据接口 ==============
def _refresh_graph_cache(graph_id: str):
"""Background thread: fetch graph data from Neo4j and update cache."""
lock = _graph_refresh_locks.setdefault(graph_id, threading.Lock())
if not lock.acquire(blocking=False):
return # another refresh already in progress
try:
# Look up ontology from the project that owns this graph_id
ontology = None
for project in ProjectManager.list_projects():
if project.graph_id == graph_id and project.ontology:
ontology = project.ontology
break
builder = GraphBuilderService()
graph_data = builder.get_graph_data(graph_id, ontology=ontology)
_graph_data_cache[graph_id] = {"data": graph_data, "ts": time.time()}
logger.info(f"Graph cache refreshed for {graph_id}")
except Exception as e:
logger.warning(f"Background graph cache refresh failed for {graph_id}: {str(e)[:100]}")
finally:
lock.release()
@graph_bp.route('/data/<graph_id>', methods=['GET'])
def get_graph_data(graph_id: str):
"""
获取图谱数据节点和边
获取图谱数据节点和边
- 有缓存且未过期直接返回缓存不调用 Zep
- 有缓存但已过期立即返回旧缓存后台异步刷新
- 无缓存后台线程拉取返回 202 让前端稍后重试
"""
try:
if not Config.ZEP_API_KEY:
return jsonify({
"success": False,
"error": t('api.zepApiKeyMissing')
}), 500
builder = GraphBuilderService(api_key=Config.ZEP_API_KEY)
graph_data = builder.get_graph_data(graph_id)
return jsonify({
"success": True,
"data": graph_data
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
if not Config.NEO4J_PASSWORD:
return jsonify({"success": False, "error": "NEO4J未配置"}), 500
cached = _graph_data_cache.get(graph_id)
age = time.time() - cached["ts"] if cached else None
if cached and age < _GRAPH_CACHE_TTL:
# Fresh cache — return immediately
return jsonify({"success": True, "data": cached["data"], "cached": True})
if cached:
# Stale cache — serve it immediately, refresh in background
threading.Thread(target=_refresh_graph_cache, args=(graph_id,), daemon=True).start()
return jsonify({"success": True, "data": cached["data"], "cached": True, "stale": True})
# No cache at all — kick off background fetch, tell frontend to retry
threading.Thread(target=_refresh_graph_cache, args=(graph_id,), daemon=True).start()
return jsonify({
"success": False,
"error": "Graph data is loading, please retry in a moment.",
"retry": True
}), 202
@graph_bp.route('/delete/<graph_id>', methods=['DELETE'])
@ -600,18 +642,18 @@ def delete_graph(graph_id: str):
删除Zep图谱
"""
try:
if not Config.ZEP_API_KEY:
if not Config.NEO4J_PASSWORD:
return jsonify({
"success": False,
"error": t('api.zepApiKeyMissing')
"error": "NEO4J未配置"
}), 500
builder = GraphBuilderService(api_key=Config.ZEP_API_KEY)
builder = GraphBuilderService()
builder.delete_graph(graph_id)
return jsonify({
"success": True,
"message": t('api.graphDeleted', id=graph_id)
"message": f"图谱已删除: {graph_id}"
})
except Exception as e:

View File

@ -14,7 +14,6 @@ from ..services.oasis_profile_generator import OasisProfileGenerator
from ..services.simulation_manager import SimulationManager, SimulationStatus
from ..services.simulation_runner import SimulationRunner, RunnerStatus
from ..utils.logger import get_logger
from ..utils.locale import t, get_locale, set_locale
from ..models.project import ProjectManager
logger = get_logger('mirofish.api.simulation')
@ -57,10 +56,10 @@ def get_graph_entities(graph_id: str):
enrich: 是否获取相关边信息默认true
"""
try:
if not Config.ZEP_API_KEY:
if not Config.NEO4J_PASSWORD:
return jsonify({
"success": False,
"error": t('api.zepApiKeyMissing')
"error": "NEO4J未配置"
}), 500
entity_types_str = request.args.get('entity_types', '')
@ -94,10 +93,10 @@ def get_graph_entities(graph_id: str):
def get_entity_detail(graph_id: str, entity_uuid: str):
"""获取单个实体的详细信息"""
try:
if not Config.ZEP_API_KEY:
if not Config.NEO4J_PASSWORD:
return jsonify({
"success": False,
"error": t('api.zepApiKeyMissing')
"error": "NEO4J未配置"
}), 500
reader = ZepEntityReader()
@ -106,7 +105,7 @@ def get_entity_detail(graph_id: str, entity_uuid: str):
if not entity:
return jsonify({
"success": False,
"error": t('api.entityNotFound', id=entity_uuid)
"error": f"实体不存在: {entity_uuid}"
}), 404
return jsonify({
@ -127,10 +126,10 @@ def get_entity_detail(graph_id: str, entity_uuid: str):
def get_entities_by_type(graph_id: str, entity_type: str):
"""获取指定类型的所有实体"""
try:
if not Config.ZEP_API_KEY:
if not Config.NEO4J_PASSWORD:
return jsonify({
"success": False,
"error": t('api.zepApiKeyMissing')
"error": "NEO4J未配置"
}), 500
enrich = request.args.get('enrich', 'true').lower() == 'true'
@ -198,21 +197,21 @@ def create_simulation():
if not project_id:
return jsonify({
"success": False,
"error": t('api.requireProjectId')
"error": "请提供 project_id"
}), 400
project = ProjectManager.get_project(project_id)
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=project_id)
"error": f"项目不存在: {project_id}"
}), 404
graph_id = data.get('graph_id') or project.graph_id
if not graph_id:
return jsonify({
"success": False,
"error": t('api.graphNotBuilt')
"error": "项目尚未构建图谱,请先调用 /api/graph/build"
}), 400
manager = SimulationManager()
@ -409,7 +408,7 @@ def prepare_simulation():
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
"error": "请提供 simulation_id"
}), 400
manager = SimulationManager()
@ -418,7 +417,7 @@ def prepare_simulation():
if not state:
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
"error": f"模拟不存在: {simulation_id}"
}), 404
# 检查是否强制重新生成
@ -437,7 +436,7 @@ def prepare_simulation():
"data": {
"simulation_id": simulation_id,
"status": "ready",
"message": t('api.alreadyPrepared'),
"message": "已有完成的准备工作,无需重复生成",
"already_prepared": True,
"prepare_info": prepare_info
}
@ -450,7 +449,7 @@ def prepare_simulation():
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=state.project_id)
"error": f"项目不存在: {state.project_id}"
}), 404
# 获取模拟需求
@ -458,7 +457,7 @@ def prepare_simulation():
if not simulation_requirement:
return jsonify({
"success": False,
"error": t('api.projectMissingRequirement')
"error": "项目缺少模拟需求描述 (simulation_requirement)"
}), 400
# 获取文档文本
@ -501,18 +500,14 @@ def prepare_simulation():
state.status = SimulationStatus.PREPARING
manager._save_simulation_state(state)
# Capture locale before spawning background thread
current_locale = get_locale()
# 定义后台任务
def run_prepare():
set_locale(current_locale)
try:
task_manager.update_task(
task_id,
status=TaskStatus.PROCESSING,
progress=0,
message=t('progress.startPreparingEnv')
message="开始准备模拟环境..."
)
# 准备模拟(带进度回调)
@ -533,10 +528,10 @@ def prepare_simulation():
# 构建详细进度信息
stage_names = {
"reading": t('progress.readingGraphEntities'),
"generating_profiles": t('progress.generatingProfiles'),
"generating_config": t('progress.generatingSimConfig'),
"copying_scripts": t('progress.preparingScripts')
"reading": "读取图谱实体",
"generating_profiles": "生成Agent人设",
"generating_config": "生成模拟配置",
"copying_scripts": "准备模拟脚本"
}
stage_index = list(stage_weights.keys()).index(stage) + 1 if stage in stage_weights else 1
@ -617,7 +612,7 @@ def prepare_simulation():
"simulation_id": simulation_id,
"task_id": task_id,
"status": "preparing",
"message": t('api.prepareStarted'),
"message": "准备任务已启动,请通过 /api/simulation/prepare/status 查询进度",
"already_prepared": False,
"expected_entities_count": state.entities_count, # 预期的Agent总数
"entity_types": state.entity_types # 实体类型列表
@ -685,7 +680,7 @@ def get_prepare_status():
"simulation_id": simulation_id,
"status": "ready",
"progress": 100,
"message": t('api.alreadyPrepared'),
"message": "已有完成的准备工作",
"already_prepared": True,
"prepare_info": prepare_info
}
@ -701,13 +696,13 @@ def get_prepare_status():
"simulation_id": simulation_id,
"status": "not_started",
"progress": 0,
"message": t('api.notStartedPrepare'),
"message": "尚未开始准备,请调用 /api/simulation/prepare 开始",
"already_prepared": False
}
})
return jsonify({
"success": False,
"error": t('api.requireTaskOrSimId')
"error": "请提供 task_id 或 simulation_id"
}), 400
task_manager = TaskManager()
@ -725,7 +720,7 @@ def get_prepare_status():
"task_id": task_id,
"status": "ready",
"progress": 100,
"message": t('api.taskCompletedPrepared'),
"message": "任务已完成(准备工作已存在)",
"already_prepared": True,
"prepare_info": prepare_info
}
@ -733,7 +728,7 @@ def get_prepare_status():
return jsonify({
"success": False,
"error": t('api.taskNotFound', id=task_id)
"error": f"任务不存在: {task_id}"
}), 404
task_dict = task.to_dict()
@ -762,7 +757,7 @@ def get_simulation(simulation_id: str):
if not state:
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
"error": f"模拟不存在: {simulation_id}"
}), 404
result = state.to_dict()
@ -1066,7 +1061,7 @@ def get_simulation_profiles_realtime(simulation_id: str):
if not os.path.exists(sim_dir):
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
"error": f"模拟不存在: {simulation_id}"
}), 404
# 确定文件路径
@ -1169,7 +1164,7 @@ def get_simulation_config_realtime(simulation_id: str):
if not os.path.exists(sim_dir):
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
"error": f"模拟不存在: {simulation_id}"
}), 404
# 配置文件路径
@ -1274,7 +1269,7 @@ def get_simulation_config(simulation_id: str):
if not config:
return jsonify({
"success": False,
"error": t('api.configNotFound')
"error": f"模拟配置不存在,请先调用 /prepare 接口"
}), 404
return jsonify({
@ -1302,7 +1297,7 @@ def download_simulation_config(simulation_id: str):
if not os.path.exists(config_path):
return jsonify({
"success": False,
"error": t('api.configFileNotFound')
"error": "配置文件不存在,请先调用 /prepare 接口"
}), 404
return send_file(
@ -1346,7 +1341,7 @@ def download_simulation_script(script_name: str):
if script_name not in allowed_scripts:
return jsonify({
"success": False,
"error": t('api.unknownScript', name=script_name, allowed=allowed_scripts)
"error": f"未知脚本: {script_name},可选: {allowed_scripts}"
}), 400
script_path = os.path.join(scripts_dir, script_name)
@ -1354,7 +1349,7 @@ def download_simulation_script(script_name: str):
if not os.path.exists(script_path):
return jsonify({
"success": False,
"error": t('api.scriptFileNotFound', name=script_name)
"error": f"脚本文件不存在: {script_name}"
}), 404
return send_file(
@ -1394,7 +1389,7 @@ def generate_profiles():
if not graph_id:
return jsonify({
"success": False,
"error": t('api.requireGraphId')
"error": "请提供 graph_id"
}), 400
entity_types = data.get('entity_types')
@ -1411,7 +1406,7 @@ def generate_profiles():
if filtered.filtered_count == 0:
return jsonify({
"success": False,
"error": t('api.noMatchingEntities')
"error": "没有找到符合条件的实体"
}), 400
generator = OasisProfileGenerator()
@ -1496,7 +1491,7 @@ def start_simulation():
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
"error": "请提供 simulation_id"
}), 400
platform = data.get('platform', 'parallel')
@ -1511,18 +1506,18 @@ def start_simulation():
if max_rounds <= 0:
return jsonify({
"success": False,
"error": t('api.maxRoundsPositive')
"error": "max_rounds 必须是正整数"
}), 400
except (ValueError, TypeError):
return jsonify({
"success": False,
"error": t('api.maxRoundsInvalid')
"error": "max_rounds 必须是有效的整数"
}), 400
if platform not in ['twitter', 'reddit', 'parallel']:
return jsonify({
"success": False,
"error": t('api.invalidPlatform', platform=platform)
"error": f"无效的平台类型: {platform},可选: twitter/reddit/parallel"
}), 400
# 检查模拟是否已准备好
@ -1532,7 +1527,7 @@ def start_simulation():
if not state:
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
"error": f"模拟不存在: {simulation_id}"
}), 404
force_restarted = False
@ -1559,7 +1554,7 @@ def start_simulation():
else:
return jsonify({
"success": False,
"error": t('api.simRunningForceHint')
"error": f"模拟正在运行中,请先调用 /stop 接口停止,或使用 force=true 强制重新开始"
}), 400
# 如果是强制模式,清理运行日志
@ -1578,7 +1573,7 @@ def start_simulation():
# 准备工作未完成
return jsonify({
"success": False,
"error": t('api.simNotReady', status=state.status.value)
"error": f"模拟未准备好,当前状态: {state.status.value},请先调用 /prepare 接口"
}), 400
# 获取图谱ID用于图谱记忆更新
@ -1595,7 +1590,7 @@ def start_simulation():
if not graph_id:
return jsonify({
"success": False,
"error": t('api.graphIdRequiredForMemory')
"error": "启用图谱记忆更新需要有效的 graph_id请确保项目已构建图谱"
}), 400
logger.info(f"启用图谱记忆更新: simulation_id={simulation_id}, graph_id={graph_id}")
@ -1668,7 +1663,7 @@ def stop_simulation():
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
"error": "请提供 simulation_id"
}), 400
run_state = SimulationRunner.stop_simulation(simulation_id)
@ -2016,7 +2011,7 @@ def get_simulation_posts(simulation_id: str):
"platform": platform,
"count": 0,
"posts": [],
"message": t('api.dbNotExist')
"message": "数据库不存在,模拟可能尚未运行"
}
})
@ -2202,33 +2197,33 @@ def interview_agent():
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
"error": "请提供 simulation_id"
}), 400
if agent_id is None:
return jsonify({
"success": False,
"error": t('api.requireAgentId')
"error": "请提供 agent_id"
}), 400
if not prompt:
return jsonify({
"success": False,
"error": t('api.requirePrompt')
"error": "请提供 prompt采访问题"
}), 400
# 验证platform参数
if platform and platform not in ("twitter", "reddit"):
return jsonify({
"success": False,
"error": t('api.invalidInterviewPlatform')
"error": "platform 参数只能是 'twitter''reddit'"
}), 400
# 检查环境状态
if not SimulationRunner.check_env_alive(simulation_id):
return jsonify({
"success": False,
"error": t('api.envNotRunning')
"error": "模拟环境未运行或已关闭。请确保模拟已完成并进入等待命令模式。"
}), 400
# 优化prompt添加前缀避免Agent调用工具
@ -2256,7 +2251,7 @@ def interview_agent():
except TimeoutError as e:
return jsonify({
"success": False,
"error": t('api.interviewTimeout', error=str(e))
"error": f"等待Interview响应超时: {str(e)}"
}), 504
except Exception as e:
@ -2323,20 +2318,20 @@ def interview_agents_batch():
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
"error": "请提供 simulation_id"
}), 400
if not interviews or not isinstance(interviews, list):
return jsonify({
"success": False,
"error": t('api.requireInterviews')
"error": "请提供 interviews采访列表"
}), 400
# 验证platform参数
if platform and platform not in ("twitter", "reddit"):
return jsonify({
"success": False,
"error": t('api.invalidInterviewPlatform')
"error": "platform 参数只能是 'twitter''reddit'"
}), 400
# 验证每个采访项
@ -2344,26 +2339,26 @@ def interview_agents_batch():
if 'agent_id' not in interview:
return jsonify({
"success": False,
"error": t('api.interviewListMissingAgentId', index=i+1)
"error": f"采访列表第{i+1}项缺少 agent_id"
}), 400
if 'prompt' not in interview:
return jsonify({
"success": False,
"error": t('api.interviewListMissingPrompt', index=i+1)
"error": f"采访列表第{i+1}项缺少 prompt"
}), 400
# 验证每项的platform如果有
item_platform = interview.get('platform')
if item_platform and item_platform not in ("twitter", "reddit"):
return jsonify({
"success": False,
"error": t('api.interviewListInvalidPlatform', index=i+1)
"error": f"采访列表第{i+1}项的platform只能是 'twitter''reddit'"
}), 400
# 检查环境状态
if not SimulationRunner.check_env_alive(simulation_id):
return jsonify({
"success": False,
"error": t('api.envNotRunning')
"error": "模拟环境未运行或已关闭。请确保模拟已完成并进入等待命令模式。"
}), 400
# 优化每个采访项的prompt添加前缀避免Agent调用工具
@ -2394,7 +2389,7 @@ def interview_agents_batch():
except TimeoutError as e:
return jsonify({
"success": False,
"error": t('api.batchInterviewTimeout', error=str(e))
"error": f"等待批量Interview响应超时: {str(e)}"
}), 504
except Exception as e:
@ -2450,27 +2445,27 @@ def interview_all_agents():
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
"error": "请提供 simulation_id"
}), 400
if not prompt:
return jsonify({
"success": False,
"error": t('api.requirePrompt')
"error": "请提供 prompt采访问题"
}), 400
# 验证platform参数
if platform and platform not in ("twitter", "reddit"):
return jsonify({
"success": False,
"error": t('api.invalidInterviewPlatform')
"error": "platform 参数只能是 'twitter''reddit'"
}), 400
# 检查环境状态
if not SimulationRunner.check_env_alive(simulation_id):
return jsonify({
"success": False,
"error": t('api.envNotRunning')
"error": "模拟环境未运行或已关闭。请确保模拟已完成并进入等待命令模式。"
}), 400
# 优化prompt添加前缀避免Agent调用工具
@ -2497,7 +2492,7 @@ def interview_all_agents():
except TimeoutError as e:
return jsonify({
"success": False,
"error": t('api.globalInterviewTimeout', error=str(e))
"error": f"等待全局Interview响应超时: {str(e)}"
}), 504
except Exception as e:
@ -2554,7 +2549,7 @@ def get_interview_history():
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
"error": "请提供 simulation_id"
}), 400
history = SimulationRunner.get_interview_history(
@ -2613,7 +2608,7 @@ def get_env_status():
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
"error": "请提供 simulation_id"
}), 400
env_alive = SimulationRunner.check_env_alive(simulation_id)
@ -2622,9 +2617,9 @@ def get_env_status():
env_status = SimulationRunner.get_env_status_detail(simulation_id)
if env_alive:
message = t('api.envRunning')
message = "环境正在运行可以接收Interview命令"
else:
message = t('api.envNotRunningShort')
message = "环境未运行或已关闭"
return jsonify({
"success": True,
@ -2681,7 +2676,7 @@ def close_simulation_env():
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
"error": "请提供 simulation_id"
}), 400
result = SimulationRunner.close_simulation_env(

View File

@ -32,8 +32,15 @@ class Config:
LLM_BASE_URL = os.environ.get('LLM_BASE_URL', 'https://api.openai.com/v1')
LLM_MODEL_NAME = os.environ.get('LLM_MODEL_NAME', 'gpt-4o-mini')
# Zep配置
ZEP_API_KEY = os.environ.get('ZEP_API_KEY')
# Neo4j + Graphiti配置替代 Zep Cloud
NEO4J_URI = os.environ.get('NEO4J_URI', 'bolt://localhost:7687')
NEO4J_USER = os.environ.get('NEO4J_USER', 'neo4j')
NEO4J_PASSWORD = os.environ.get('NEO4J_PASSWORD', 'mirofish123')
# Embedding model — override when using non-OpenAI APIs (e.g. Gemini: text-embedding-004)
EMBEDDING_MODEL = os.environ.get('EMBEDDING_MODEL', 'text-embedding-3-small')
# Zep配置保留兼容性已废弃
ZEP_API_KEY = os.environ.get('ZEP_API_KEY', '')
# 文件上传配置
MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50MB
@ -69,7 +76,7 @@ class Config:
errors = []
if not cls.LLM_API_KEY:
errors.append("LLM_API_KEY 未配置")
if not cls.ZEP_API_KEY:
errors.append("ZEP_API_KEY 未配置")
if not cls.NEO4J_PASSWORD:
errors.append("NEO4J_PASSWORD 未配置")
return errors

View File

@ -10,8 +10,7 @@ import threading
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
from zep_cloud.client import Zep
from zep_cloud import EpisodeData, EntityEdgeSourceTarget
from .graphiti_adapter import GraphitiAdapter
from ..config import Config
from ..models.task import TaskManager, TaskStatus
@ -20,6 +19,54 @@ from .text_processor import TextProcessor
from ..utils.locale import t, get_locale, set_locale
def _classify_entity_type(name: str, summary: str, ontology: Optional[Dict]) -> str:
"""
Classify an entity into an ontology type using keyword matching
against entity type names, descriptions, and examples.
Falls back to 'Entity' if no ontology or no match found.
"""
if not ontology:
return "Entity"
entity_types = ontology.get("entity_types", [])
if not entity_types:
return "Entity"
name_lower = (name or "").lower()
summary_lower = (summary or "").lower()
search_text = f"{name_lower} {summary_lower}"
best_type = "Entity"
best_score = 0
for et in entity_types:
score = 0
type_name = et.get("name", "")
type_name_lower = type_name.lower()
# Exact name match in type name
if type_name_lower in name_lower:
score += 10
# Check examples list
for example in et.get("examples", []):
if example.lower() in search_text:
score += 8
elif name_lower in example.lower():
score += 6
# Check description keywords
desc_words = (et.get("description", "")).lower().split()
for word in desc_words:
if len(word) > 4 and word in search_text:
score += 1
if score > best_score:
best_score = score
best_type = type_name
return best_type if best_score > 0 else "Entity"
@dataclass
class GraphInfo:
"""图谱信息"""
@ -44,11 +91,7 @@ class GraphBuilderService:
"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or Config.ZEP_API_KEY
if not self.api_key:
raise ValueError("ZEP_API_KEY 未配置")
self.client = Zep(api_key=self.api_key)
self.client = GraphitiAdapter()
self.task_manager = TaskManager()
def build_graph_async(
@ -203,106 +246,27 @@ class GraphBuilderService:
return graph_id
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
"""设置图谱本体(公开方法)"""
import warnings
from typing import Optional
from pydantic import Field
from zep_cloud.external_clients.ontology import EntityModel, EntityText, EdgeModel
# 抑制 Pydantic v2 关于 Field(default=None) 的警告
# 这是 Zep SDK 要求的用法,警告来自动态类创建,可以安全忽略
warnings.filterwarnings('ignore', category=UserWarning, module='pydantic')
# Zep 保留名称,不能作为属性名
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
"""将保留名称转换为安全名称"""
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
# 动态创建实体类型
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
description = entity_def.get("description", f"A {name} entity.")
# 创建属性字典和类型注解Pydantic v2 需要)
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"]) # 使用安全名称
attr_desc = attr_def.get("description", attr_name)
# Zep API 需要 Field 的 description这是必需的
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText] # 类型注解
attrs["__annotations__"] = annotations
# 动态创建类
entity_class = type(name, (EntityModel,), attrs)
entity_class.__doc__ = description
entity_types[name] = entity_class
# 动态创建边类型
edge_definitions = {}
for edge_def in ontology.get("edge_types", []):
name = edge_def["name"]
description = edge_def.get("description", f"A {name} relationship.")
# 创建属性字典和类型注解
attrs = {"__doc__": description}
annotations = {}
for attr_def in edge_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"]) # 使用安全名称
attr_desc = attr_def.get("description", attr_name)
# Zep API 需要 Field 的 description这是必需的
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[str] # 边属性用str类型
attrs["__annotations__"] = annotations
# 动态创建类
class_name = ''.join(word.capitalize() for word in name.split('_'))
edge_class = type(class_name, (EdgeModel,), attrs)
edge_class.__doc__ = description
# 构建source_targets
source_targets = []
for st in edge_def.get("source_targets", []):
source_targets.append(
EntityEdgeSourceTarget(
source=st.get("source", "Entity"),
target=st.get("target", "Entity")
)
)
if source_targets:
edge_definitions[name] = (edge_class, source_targets)
# 调用Zep API设置本体
if entity_types or edge_definitions:
self.client.graph.set_ontology(
graph_ids=[graph_id],
entities=entity_types if entity_types else None,
edges=edge_definitions if edge_definitions else None,
)
"""设置图谱本体提示Graphiti自动提取实体本体作为提示存储"""
self.client.graph.set_ontology(
graph_ids=[graph_id],
entities=ontology.get("entity_types"),
edges=ontology.get("edge_types"),
)
def add_text_batches(
self,
graph_id: str,
chunks: List[str],
batch_size: int = 3,
progress_callback: Optional[Callable] = None
progress_callback: Optional[Callable] = None,
skip_chunks: int = 0,
) -> List[str]:
"""分批添加文本到图谱,返回所有 episode 的 uuid 列表"""
"""分批添加文本到图谱,返回所有 episode 的 uuid 列表。
skip_chunks: 跳过已处理的块数用于断点续传"""
episode_uuids = []
total_chunks = len(chunks)
for i in range(0, total_chunks, batch_size):
for i in range(skip_chunks, total_chunks, batch_size):
batch_chunks = chunks[i:i + batch_size]
batch_num = i // batch_size + 1
total_batches = (total_chunks + batch_size - 1) // batch_size
@ -313,10 +277,11 @@ class GraphBuilderService:
t('progress.sendingBatch', current=batch_num, total=total_batches, chunks=len(batch_chunks)),
progress
)
# 构建episode数据
episodes = [
EpisodeData(data=chunk, type="text")
type('Episode', (), {'data': chunk, 'type': 'text'})()
for chunk in batch_chunks
]
@ -423,7 +388,7 @@ class GraphBuilderService:
entity_types=list(entity_types)
)
def get_graph_data(self, graph_id: str) -> Dict[str, Any]:
def get_graph_data(self, graph_id: str, ontology: Optional[Dict] = None) -> Dict[str, Any]:
"""
获取完整图谱数据包含详细信息
@ -448,10 +413,15 @@ class GraphBuilderService:
if created_at:
created_at = str(created_at)
entity_type = _classify_entity_type(node.name, node.summary or "", ontology)
labels = node.labels or []
if entity_type != "Entity" and entity_type not in labels:
labels = [entity_type] + [l for l in labels if l != "Entity"]
nodes_data.append({
"uuid": node.uuid_,
"name": node.name,
"labels": node.labels or [],
"labels": labels,
"summary": node.summary or "",
"attributes": node.attributes or {},
"created_at": created_at,

View File

@ -0,0 +1,491 @@
"""
Graphiti Adapter Drop-in replacement for the Zep Cloud client.
Exposes the same namespace as the Zep client so all consuming code
(graph_builder, zep_tools, zep_entity_reader, etc.) needs only a
one-line import swap:
from .graphiti_adapter import GraphitiAdapter
self.client = GraphitiAdapter()
Then all self.client.graph.* calls work unchanged.
"""
import asyncio
import threading
import uuid as _uuid_mod
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType, EntityNode
from graphiti_core.edges import EntityEdge
from graphiti_core.search.search_config import SearchConfig, SearchResults
from graphiti_core.search.search_config_recipes import (
NODE_HYBRID_SEARCH_RRF,
EDGE_HYBRID_SEARCH_RRF,
)
from graphiti_core.llm_client.config import LLMConfig
from graphiti_core.llm_client.gemini_client import GeminiClient
from graphiti_core.embedder.gemini import GeminiEmbedder, GeminiEmbedderConfig
from graphiti_core.cross_encoder.client import CrossEncoderClient
from ..config import Config
from ..utils.logger import get_logger
logger = get_logger('mirofish.graphiti_adapter')
class _GeminiReranker(CrossEncoderClient):
"""Simple reranker using Gemini — returns passages sorted by relevance."""
def __init__(self, client: GeminiClient):
self._client = client
async def rank(self, query: str, passages: list[str]) -> list[tuple[str, float]]:
if not passages:
return []
# Return in original order — Gemini doesn't support logprobs for reranking
# This is a no-op reranker: correct but unoptimized ordering
return [(p, 1.0 - i * 0.01) for i, p in enumerate(passages)]
# ---------------------------------------------------------------------------
# Persistent event loop in a dedicated background thread.
# All async calls are submitted here so the Neo4j driver (which is bound
# to one event loop) never crosses loop boundaries.
# ---------------------------------------------------------------------------
_loop: Optional[asyncio.AbstractEventLoop] = None
_loop_thread: Optional[threading.Thread] = None
_loop_lock = threading.Lock()
def _get_loop() -> asyncio.AbstractEventLoop:
global _loop, _loop_thread
if _loop is None:
with _loop_lock:
if _loop is None:
_loop = asyncio.new_event_loop()
_loop_thread = threading.Thread(
target=_loop.run_forever, daemon=True, name="graphiti-event-loop"
)
_loop_thread.start()
return _loop
def _run(coro):
"""Submit coroutine to the persistent event loop thread and wait for result."""
future = asyncio.run_coroutine_threadsafe(coro, _get_loop())
return future.result(timeout=300)
# ---------------------------------------------------------------------------
# Singleton Graphiti instance (one Neo4j driver for the whole process)
# ---------------------------------------------------------------------------
_graphiti_instance: Optional[Graphiti] = None
_graphiti_lock = threading.Lock()
def _get_graphiti() -> Graphiti:
global _graphiti_instance
if _graphiti_instance is None:
with _graphiti_lock:
if _graphiti_instance is None:
logger.info("Initializing Graphiti client...")
llm_cfg = LLMConfig(
api_key=Config.LLM_API_KEY,
model=Config.LLM_MODEL_NAME,
)
llm_client = GeminiClient(config=llm_cfg)
embedder = GeminiEmbedder(
config=GeminiEmbedderConfig(
api_key=Config.LLM_API_KEY,
embedding_model=Config.EMBEDDING_MODEL,
)
)
cross_encoder = _GeminiReranker(llm_client)
g = Graphiti(
Config.NEO4J_URI,
Config.NEO4J_USER,
Config.NEO4J_PASSWORD,
llm_client=llm_client,
embedder=embedder,
cross_encoder=cross_encoder,
)
# Use the persistent loop so the driver is bound to it from the start
_run(g.build_indices_and_constraints())
_graphiti_instance = g
logger.info("Graphiti client ready.")
return _graphiti_instance
# ---------------------------------------------------------------------------
# Compatibility data classes (mimic Zep response objects)
# ---------------------------------------------------------------------------
@dataclass
class _NodeResult:
"""Zep-compatible node object."""
uuid_: str
name: str
labels: List[str]
summary: str
attributes: Dict[str, Any]
created_at: Optional[str] = None
@property
def uuid(self):
return self.uuid_
@dataclass
class _EdgeResult:
"""Zep-compatible edge object."""
uuid_: str
name: str
fact: str
source_node_uuid: str
target_node_uuid: str
attributes: Dict[str, Any]
created_at: Optional[str] = None
valid_at: Optional[str] = None
invalid_at: Optional[str] = None
expired_at: Optional[str] = None
@property
def uuid(self):
return self.uuid_
@dataclass
class _EpisodeResult:
"""Zep-compatible episode object — always processed (Graphiti is sync)."""
uuid_: str
processed: bool = True
@property
def uuid(self):
return self.uuid_
@dataclass
class _SearchResults:
"""Zep-compatible search result object."""
edges: List[_EdgeResult] = field(default_factory=list)
nodes: List[_NodeResult] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Helpers: convert Graphiti objects → Zep-compatible objects
# ---------------------------------------------------------------------------
def _to_ts(dt: Optional[datetime]) -> Optional[str]:
if dt is None:
return None
return dt.isoformat()
def _entity_node_to_result(n: EntityNode) -> _NodeResult:
return _NodeResult(
uuid_=n.uuid,
name=n.name,
labels=list(n.labels) if n.labels else ["Entity"],
summary=n.summary or "",
attributes=n.attributes or {},
created_at=_to_ts(n.created_at),
)
def _entity_edge_to_result(e: EntityEdge) -> _EdgeResult:
return _EdgeResult(
uuid_=e.uuid,
name=e.name or "",
fact=e.fact or "",
source_node_uuid=e.source_node_uuid,
target_node_uuid=e.target_node_uuid,
attributes={},
created_at=_to_ts(e.created_at),
valid_at=_to_ts(e.valid_at),
invalid_at=_to_ts(e.invalid_at),
expired_at=_to_ts(e.expired_at),
)
def _neo4j_record_to_node(record: Dict) -> _NodeResult:
labels = record.get("labels", ["Entity"])
if isinstance(labels, (list, tuple)):
labels = [str(l) for l in labels]
return _NodeResult(
uuid_=record.get("uuid", ""),
name=record.get("name", ""),
labels=labels,
summary=record.get("summary", ""),
attributes=record.get("attributes") or {},
created_at=str(record.get("created_at", "")) or None,
)
def _neo4j_record_to_edge(record: Dict) -> _EdgeResult:
def ts(v):
return str(v) if v else None
return _EdgeResult(
uuid_=record.get("uuid", ""),
name=record.get("name", ""),
fact=record.get("fact", ""),
source_node_uuid=record.get("source_node_uuid", ""),
target_node_uuid=record.get("target_node_uuid", ""),
attributes=record.get("attributes") or {},
created_at=ts(record.get("created_at")),
valid_at=ts(record.get("valid_at")),
invalid_at=ts(record.get("invalid_at")),
expired_at=ts(record.get("expired_at")),
)
# ---------------------------------------------------------------------------
# Neo4j direct query helpers
# ---------------------------------------------------------------------------
async def _neo4j_query(graphiti: Graphiti, cypher: str, params: Dict) -> List[Dict]:
"""Execute a read Cypher query and return list of record dicts."""
records, _, _ = await graphiti.driver.execute_query(cypher, params)
return [dict(r) for r in records]
async def _neo4j_write(graphiti: Graphiti, cypher: str, params: Dict) -> None:
"""Execute a write Cypher query."""
await graphiti.driver.execute_query(cypher, params)
# Cypher queries
_NODES_BY_GROUP = """
MATCH (n:Entity {group_id: $group_id})
RETURN n.uuid AS uuid, n.name AS name, n.summary AS summary,
labels(n) AS labels, n.created_at AS created_at,
n.attributes AS attributes
ORDER BY n.created_at ASC
SKIP $skip LIMIT $limit
"""
_EDGES_BY_GROUP = """
MATCH (s:Entity {group_id: $group_id})-[r:RELATES_TO]->(t:Entity {group_id: $group_id})
RETURN r.uuid AS uuid, r.name AS name, r.fact AS fact,
s.uuid AS source_node_uuid,
t.uuid AS target_node_uuid,
r.created_at AS created_at, r.valid_at AS valid_at,
r.invalid_at AS invalid_at, r.expired_at AS expired_at,
r.attributes AS attributes
ORDER BY r.created_at ASC
SKIP $skip LIMIT $limit
"""
_NODE_BY_UUID = """
MATCH (n:Entity {uuid: $uuid})
RETURN n.uuid AS uuid, n.name AS name, n.summary AS summary,
labels(n) AS labels, n.created_at AS created_at,
n.group_id AS group_id, n.attributes AS attributes
LIMIT 1
"""
_EDGES_BY_NODE_UUID = """
MATCH (s:Entity {uuid: $node_uuid})-[r:RELATES_TO]->(t:Entity)
RETURN r.uuid AS uuid, r.name AS name, r.fact AS fact,
s.uuid AS source_node_uuid,
t.uuid AS target_node_uuid,
r.created_at AS created_at, r.valid_at AS valid_at,
r.invalid_at AS invalid_at, r.expired_at AS expired_at
UNION
MATCH (s:Entity)-[r:RELATES_TO]->(t:Entity {uuid: $node_uuid})
RETURN r.uuid AS uuid, r.name AS name, r.fact AS fact,
s.uuid AS source_node_uuid,
t.uuid AS target_node_uuid,
r.created_at AS created_at, r.valid_at AS valid_at,
r.invalid_at AS invalid_at, r.expired_at AS expired_at
"""
_DELETE_GROUP = """
MATCH (n:Entity {group_id: $group_id})
DETACH DELETE n
"""
# ---------------------------------------------------------------------------
# Sub-namespaces
# ---------------------------------------------------------------------------
class _EpisodeNamespace:
def get(self, uuid_: str) -> _EpisodeResult:
"""Always returns processed=True — Graphiti is synchronous."""
return _EpisodeResult(uuid_=uuid_, processed=True)
class _NodeNamespace:
def __init__(self, graphiti: Graphiti):
self._g = graphiti
def get_by_graph_id(
self,
graph_id: str,
limit: int = 100,
uuid_cursor: Optional[str] = None,
) -> List[_NodeResult]:
"""Return nodes for a group. First call returns all; cursor call returns empty."""
if uuid_cursor is not None:
# Already fetched all on first call — signal end of pagination
return []
records = _run(_neo4j_query(
self._g, _NODES_BY_GROUP,
{"group_id": graph_id, "skip": 0, "limit": 10000}
))
return [_neo4j_record_to_node(r) for r in records]
def get(self, uuid_: str) -> _NodeResult:
records = _run(_neo4j_query(self._g, _NODE_BY_UUID, {"uuid": uuid_}))
if not records:
return _NodeResult(uuid_=uuid_, name="", labels=[], summary="", attributes={})
return _neo4j_record_to_node(records[0])
def get_entity_edges(self, node_uuid: str) -> List[_EdgeResult]:
records = _run(_neo4j_query(
self._g, _EDGES_BY_NODE_UUID, {"node_uuid": node_uuid}
))
return [_neo4j_record_to_edge(r) for r in records]
class _EdgeNamespace:
def __init__(self, graphiti: Graphiti):
self._g = graphiti
def get_by_graph_id(
self,
graph_id: str,
limit: int = 100,
uuid_cursor: Optional[str] = None,
) -> List[_EdgeResult]:
"""Return edges for a group. First call returns all; cursor call returns empty."""
if uuid_cursor is not None:
return []
records = _run(_neo4j_query(
self._g, _EDGES_BY_GROUP,
{"group_id": graph_id, "skip": 0, "limit": 50000}
))
return [_neo4j_record_to_edge(r) for r in records]
class _GraphNamespace:
def __init__(self, graphiti: Graphiti):
self._g = graphiti
self.node = _NodeNamespace(graphiti)
self.edge = _EdgeNamespace(graphiti)
self.episode = _EpisodeNamespace()
self._ontologies: Dict[str, Dict] = {} # graph_id -> ontology dict
def create(self, graph_id: str, name: str, description: str = "") -> None:
"""No-op — Graphiti uses group_id implicitly, no explicit creation needed."""
logger.info(f"Graph '{graph_id}' registered (group_id in Graphiti)")
def set_ontology(
self,
graph_ids: List[str],
entities: Any = None,
edges: Any = None,
) -> None:
"""Store ontology hints for use during episode ingestion. Graphiti extracts entities dynamically."""
for gid in graph_ids:
self._ontologies[gid] = {"entities": entities, "edges": edges}
logger.info(f"Ontology hints stored for graphs: {graph_ids}")
def add(self, graph_id: str, type: str = "text", data: str = "") -> _EpisodeResult:
"""Add a single text episode to the graph."""
result = _run(self._g.add_episode(
name=f"activity_{_uuid_mod.uuid4().hex[:8]}",
episode_body=data,
source_description="MiroFish simulation activity",
reference_time=datetime.now(timezone.utc),
source=EpisodeType.text,
group_id=graph_id,
update_communities=False,
))
ep_uuid_out = result.episode.uuid if result and result.episode else str(_uuid_mod.uuid4())
return _EpisodeResult(uuid_=ep_uuid_out)
def add_batch(self, graph_id: str, episodes: List[Any]) -> List[_EpisodeResult]:
"""Add a batch of episodes. Returns list of EpisodeResult with uuid_."""
results = []
for ep in episodes:
text = getattr(ep, 'data', '') or str(ep)
try:
result = _run(self._g.add_episode(
name=f"chunk_{_uuid_mod.uuid4().hex[:8]}",
episode_body=text,
source_description="MiroFish document chunk",
reference_time=datetime.now(timezone.utc),
source=EpisodeType.text,
group_id=graph_id,
update_communities=False,
))
ep_uuid_out = result.episode.uuid if result and result.episode else str(_uuid_mod.uuid4())
except Exception as e:
logger.warning(f"Episode add failed: {str(e)[:100]}, using placeholder uuid")
ep_uuid_out = str(_uuid_mod.uuid4())
results.append(_EpisodeResult(uuid_=ep_uuid_out))
return results
def search(
self,
graph_id: str,
query: str,
limit: int = 10,
scope: str = "edges",
reranker: Optional[str] = None,
) -> _SearchResults:
"""Semantic search over the graph. scope='edges'|'nodes'|'both'."""
try:
if scope == "nodes":
results = _run(self._g.search_(
query=query,
config=SearchConfig(
node_config=NODE_HYBRID_SEARCH_RRF.node_config,
limit=limit,
),
group_ids=[graph_id],
))
nodes = [_entity_node_to_result(n) for n in (results.nodes or [])]
return _SearchResults(nodes=nodes)
else:
edges = _run(self._g.search(
query=query,
group_ids=[graph_id],
num_results=limit,
))
return _SearchResults(edges=[_entity_edge_to_result(e) for e in (edges or [])])
except Exception as e:
logger.warning(f"Graph search failed: {str(e)[:150]}")
return _SearchResults()
def delete(self, graph_id: str) -> None:
"""Delete all nodes and edges for a group_id."""
_run(_neo4j_write(self._g, _DELETE_GROUP, {"group_id": graph_id}))
logger.info(f"Graph '{graph_id}' deleted from Neo4j")
# ---------------------------------------------------------------------------
# Main adapter class — drop-in for Zep(api_key=...)
# ---------------------------------------------------------------------------
class GraphitiAdapter:
"""
Drop-in replacement for `from zep_cloud.client import Zep`.
Usage:
self.client = GraphitiAdapter()
self.client.graph.create(graph_id, name)
self.client.graph.search(graph_id, query, limit, scope)
self.client.graph.node.get(uuid_)
...
"""
def __init__(self, api_key: Optional[str] = None):
# api_key ignored — kept for signature compatibility
graphiti = _get_graphiti()
self.graph = _GraphNamespace(graphiti)

View File

@ -16,7 +16,7 @@ from dataclasses import dataclass, field
from datetime import datetime
from openai import OpenAI
from zep_cloud.client import Zep
from .graphiti_adapter import GraphitiAdapter
from ..config import Config
from ..utils.logger import get_logger
@ -198,16 +198,8 @@ class OasisProfileGenerator:
base_url=self.base_url
)
# Zep客户端用于检索丰富上下文
self.zep_api_key = zep_api_key or Config.ZEP_API_KEY
self.zep_client = None
self.zep_client = GraphitiAdapter()
self.graph_id = graph_id
if self.zep_api_key:
try:
self.zep_client = Zep(api_key=self.zep_api_key)
except Exception as e:
logger.warning(f"Zep客户端初始化失败: {e}")
def generate_profile_from_entity(
self,

View File

@ -7,7 +7,7 @@ import time
from typing import Dict, Any, List, Optional, Set, Callable, TypeVar
from dataclasses import dataclass, field
from zep_cloud.client import Zep
from .graphiti_adapter import GraphitiAdapter
from ..config import Config
from ..utils.logger import get_logger
@ -79,11 +79,7 @@ class ZepEntityReader:
"""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key or Config.ZEP_API_KEY
if not self.api_key:
raise ValueError("ZEP_API_KEY 未配置")
self.client = Zep(api_key=self.api_key)
self.client = GraphitiAdapter()
def _call_with_retry(
self,
@ -234,27 +230,51 @@ class ZepEntityReader:
FilteredEntities: 过滤后的实体集合
"""
logger.info(f"开始筛选图谱 {graph_id} 的实体...")
# Look up ontology from project to classify entities
ontology = None
try:
from ..models.project import ProjectManager
from .graph_builder import _classify_entity_type
for p in ProjectManager.list_projects():
if p.graph_id == graph_id and p.ontology:
ontology = p.ontology
break
except Exception:
pass
# 获取所有节点
all_nodes = self.get_all_nodes(graph_id)
total_count = len(all_nodes)
# Apply ontology-based classification so all nodes get proper type labels
if ontology:
for node in all_nodes:
labels = node.get("labels", [])
custom = [l for l in labels if l not in ("Entity", "Node")]
if not custom:
entity_type = _classify_entity_type(
node.get("name", ""), node.get("summary", ""), ontology
)
if entity_type != "Entity":
node["labels"] = [entity_type] + labels
# 获取所有边(用于后续关联查找)
all_edges = self.get_all_edges(graph_id) if enrich_with_edges else []
# 构建节点UUID到节点数据的映射
node_map = {n["uuid"]: n for n in all_nodes}
# 筛选符合条件的实体
filtered_entities = []
entity_types_found = set()
for node in all_nodes:
labels = node.get("labels", [])
# 筛选逻辑Labels必须包含除"Entity"和"Node"之外的标签
custom_labels = [l for l in labels if l not in ["Entity", "Node"]]
if not custom_labels:
# 只有默认标签,跳过
continue

View File

@ -12,7 +12,7 @@ from dataclasses import dataclass
from datetime import datetime
from queue import Queue, Empty
from zep_cloud.client import Zep
from .graphiti_adapter import GraphitiAdapter
from ..config import Config
from ..utils.logger import get_logger
@ -238,12 +238,7 @@ class ZepGraphMemoryUpdater:
api_key: Zep API Key可选默认从配置读取
"""
self.graph_id = graph_id
self.api_key = api_key or Config.ZEP_API_KEY
if not self.api_key:
raise ValueError("ZEP_API_KEY未配置")
self.client = Zep(api_key=self.api_key)
self.client = GraphitiAdapter()
# 活动队列
self._activity_queue: Queue = Queue()

View File

@ -13,12 +13,11 @@ import json
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from zep_cloud.client import Zep
from .graphiti_adapter import GraphitiAdapter
from ..config import Config
from ..utils.logger import get_logger
from ..utils.llm_client import LLMClient
from ..utils.locale import get_locale, t
from ..utils.zep_paging import fetch_all_nodes, fetch_all_edges
logger = get_logger('mirofish.zep_tools')
@ -423,14 +422,10 @@ class ZepToolsService:
RETRY_DELAY = 2.0
def __init__(self, api_key: Optional[str] = None, llm_client: Optional[LLMClient] = None):
self.api_key = api_key or Config.ZEP_API_KEY
if not self.api_key:
raise ValueError("ZEP_API_KEY 未配置")
self.client = Zep(api_key=self.api_key)
self.client = GraphitiAdapter()
# LLM客户端用于InsightForge生成子问题
self._llm_client = llm_client
logger.info(t("console.zepToolsInitialized"))
logger.info("ZepToolsService 初始化完成")
@property
def llm(self) -> LLMClient:
@ -440,25 +435,38 @@ class ZepToolsService:
return self._llm_client
def _call_with_retry(self, func, operation_name: str, max_retries: int = None):
"""带重试机制的API调用"""
"""带重试机制的API调用自动处理429限速"""
max_retries = max_retries or self.MAX_RETRIES
last_exception = None
delay = self.RETRY_DELAY
for attempt in range(max_retries):
try:
return func()
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
logger.warning(
t("console.zepRetryAttempt", operation=operation_name, attempt=attempt + 1, error=str(e)[:100], delay=f"{delay:.1f}")
)
time.sleep(delay)
# 检测429限速错误使用retry-after头部的等待时间
wait = delay
if hasattr(e, 'status_code') and e.status_code == 429:
retry_after = None
if hasattr(e, 'headers') and e.headers:
retry_after = e.headers.get('retry-after')
wait = float(retry_after) + 1 if retry_after else 65.0
logger.warning(
f"Zep {operation_name} 触发限速 (429), "
f"等待 {wait:.0f} 秒后重试 (第 {attempt + 1}/{max_retries - 1} 次)..."
)
else:
logger.warning(
f"Zep {operation_name}{attempt + 1} 次尝试失败: {str(e)[:100]}, "
f"{wait:.1f}秒后重试..."
)
time.sleep(wait)
delay *= 2
else:
logger.error(t("console.zepAllRetriesFailed", operation=operation_name, retries=max_retries, error=str(e)))
logger.error(f"Zep {operation_name}{max_retries} 次尝试后仍失败: {str(e)}")
raise last_exception
def search_graph(
@ -483,7 +491,7 @@ class ZepToolsService:
Returns:
SearchResult: 搜索结果
"""
logger.info(t("console.graphSearch", graphId=graph_id, query=query[:50]))
logger.info(f"图谱搜索: graph_id={graph_id}, query={query[:50]}...")
# 尝试使用Zep Cloud Search API
try:
@ -495,7 +503,7 @@ class ZepToolsService:
scope=scope,
reranker="cross_encoder"
),
operation_name=t("console.graphSearchOp", graphId=graph_id)
operation_name=f"图谱搜索(graph={graph_id})"
)
facts = []
@ -528,7 +536,7 @@ class ZepToolsService:
if hasattr(node, 'summary') and node.summary:
facts.append(f"[{node.name}]: {node.summary}")
logger.info(t("console.searchComplete", count=len(facts)))
logger.info(f"搜索完成: 找到 {len(facts)} 条相关事实")
return SearchResult(
facts=facts,
@ -539,7 +547,7 @@ class ZepToolsService:
)
except Exception as e:
logger.warning(t("console.zepSearchApiFallback", error=str(e)))
logger.warning(f"Zep Search API失败降级为本地搜索: {str(e)}")
# 降级:使用本地关键词匹配搜索
return self._local_search(graph_id, query, limit, scope)
@ -564,7 +572,7 @@ class ZepToolsService:
Returns:
SearchResult: 搜索结果
"""
logger.info(t("console.usingLocalSearch", query=query[:30]))
logger.info(f"使用本地搜索: query={query[:30]}...")
facts = []
edges_result = []
@ -634,10 +642,10 @@ class ZepToolsService:
if node.summary:
facts.append(f"[{node.name}]: {node.summary}")
logger.info(t("console.localSearchComplete", count=len(facts)))
logger.info(f"本地搜索完成: 找到 {len(facts)} 条相关事实")
except Exception as e:
logger.error(t("console.localSearchFailed", error=str(e)))
logger.error(f"本地搜索失败: {str(e)}")
return SearchResult(
facts=facts,
@ -657,7 +665,7 @@ class ZepToolsService:
Returns:
节点列表
"""
logger.info(t("console.fetchingAllNodes", graphId=graph_id))
logger.info(f"获取图谱 {graph_id} 的所有节点...")
nodes = fetch_all_nodes(self.client, graph_id)
@ -672,7 +680,7 @@ class ZepToolsService:
attributes=node.attributes or {}
))
logger.info(t("console.fetchedNodes", count=len(result)))
logger.info(f"获取到 {len(result)} 个节点")
return result
def get_all_edges(self, graph_id: str, include_temporal: bool = True) -> List[EdgeInfo]:
@ -686,7 +694,7 @@ class ZepToolsService:
Returns:
边列表包含created_at, valid_at, invalid_at, expired_at
"""
logger.info(t("console.fetchingAllEdges", graphId=graph_id))
logger.info(f"获取图谱 {graph_id} 的所有边...")
edges = fetch_all_edges(self.client, graph_id)
@ -710,7 +718,7 @@ class ZepToolsService:
result.append(edge_info)
logger.info(t("console.fetchedEdges", count=len(result)))
logger.info(f"获取到 {len(result)} 条边")
return result
def get_node_detail(self, node_uuid: str) -> Optional[NodeInfo]:
@ -723,12 +731,12 @@ class ZepToolsService:
Returns:
节点信息或None
"""
logger.info(t("console.fetchingNodeDetail", uuid=node_uuid[:8]))
logger.info(f"获取节点详情: {node_uuid[:8]}...")
try:
node = self._call_with_retry(
func=lambda: self.client.graph.node.get(uuid_=node_uuid),
operation_name=t("console.fetchNodeDetailOp", uuid=node_uuid[:8])
operation_name=f"获取节点详情(uuid={node_uuid[:8]}...)"
)
if not node:
@ -742,7 +750,7 @@ class ZepToolsService:
attributes=node.attributes or {}
)
except Exception as e:
logger.error(t("console.fetchNodeDetailFailed", error=str(e)))
logger.error(f"获取节点详情失败: {str(e)}")
return None
def get_node_edges(self, graph_id: str, node_uuid: str) -> List[EdgeInfo]:
@ -758,7 +766,7 @@ class ZepToolsService:
Returns:
边列表
"""
logger.info(t("console.fetchingNodeEdges", uuid=node_uuid[:8]))
logger.info(f"获取节点 {node_uuid[:8]}... 的相关边")
try:
# 获取图谱所有边,然后过滤
@ -770,11 +778,11 @@ class ZepToolsService:
if edge.source_node_uuid == node_uuid or edge.target_node_uuid == node_uuid:
result.append(edge)
logger.info(t("console.foundNodeEdges", count=len(result)))
logger.info(f"找到 {len(result)} 条与节点相关的边")
return result
except Exception as e:
logger.warning(t("console.fetchNodeEdgesFailed", error=str(e)))
logger.warning(f"获取节点边失败: {str(e)}")
return []
def get_entities_by_type(
@ -792,7 +800,7 @@ class ZepToolsService:
Returns:
符合类型的实体列表
"""
logger.info(t("console.fetchingEntitiesByType", type=entity_type))
logger.info(f"获取类型为 {entity_type} 的实体...")
all_nodes = self.get_all_nodes(graph_id)
@ -802,7 +810,7 @@ class ZepToolsService:
if entity_type in node.labels:
filtered.append(node)
logger.info(t("console.foundEntitiesByType", count=len(filtered), type=entity_type))
logger.info(f"找到 {len(filtered)}{entity_type} 类型的实体")
return filtered
def get_entity_summary(
@ -822,7 +830,7 @@ class ZepToolsService:
Returns:
实体摘要信息
"""
logger.info(t("console.fetchingEntitySummary", name=entity_name))
logger.info(f"获取实体 {entity_name} 的关系摘要...")
# 先搜索该实体相关的信息
search_result = self.search_graph(
@ -862,7 +870,7 @@ class ZepToolsService:
Returns:
统计信息
"""
logger.info(t("console.fetchingGraphStats", graphId=graph_id))
logger.info(f"获取图谱 {graph_id} 的统计信息...")
nodes = self.get_all_nodes(graph_id)
edges = self.get_all_edges(graph_id)
@ -906,7 +914,7 @@ class ZepToolsService:
Returns:
模拟上下文信息
"""
logger.info(t("console.fetchingSimContext", requirement=simulation_requirement[:50]))
logger.info(f"获取模拟上下文: {simulation_requirement[:50]}...")
# 搜索与模拟需求相关的信息
search_result = self.search_graph(
@ -970,7 +978,7 @@ class ZepToolsService:
Returns:
InsightForgeResult: 深度洞察检索结果
"""
logger.info(t("console.insightForgeStart", query=query[:50]))
logger.info(f"InsightForge 深度洞察检索: {query[:50]}...")
result = InsightForgeResult(
query=query,
@ -986,7 +994,7 @@ class ZepToolsService:
max_queries=max_sub_queries
)
result.sub_queries = sub_queries
logger.info(t("console.generatedSubQueries", count=len(sub_queries)))
logger.info(f"生成 {len(sub_queries)} 个子问题")
# Step 2: 对每个子问题进行语义搜索
all_facts = []
@ -1086,7 +1094,7 @@ class ZepToolsService:
result.relationship_chains = relationship_chains
result.total_relationships = len(relationship_chains)
logger.info(t("console.insightForgeComplete", facts=result.total_facts, entities=result.total_entities, relationships=result.total_relationships))
logger.info(f"InsightForge完成: {result.total_facts}条事实, {result.total_entities}个实体, {result.total_relationships}条关系")
return result
def _generate_sub_queries(
@ -1133,7 +1141,7 @@ class ZepToolsService:
return [str(sq) for sq in sub_queries[:max_queries]]
except Exception as e:
logger.warning(t("console.generateSubQueriesFailed", error=str(e)))
logger.warning(f"生成子问题失败: {str(e)},使用默认子问题")
# 降级:返回基于原问题的变体
return [
query,
@ -1168,7 +1176,7 @@ class ZepToolsService:
Returns:
PanoramaResult: 广度搜索结果
"""
logger.info(t("console.panoramaSearchStart", query=query[:50]))
logger.info(f"PanoramaSearch 广度搜索: {query[:50]}...")
result = PanoramaResult(query=query)
@ -1231,7 +1239,7 @@ class ZepToolsService:
result.active_count = len(active_facts)
result.historical_count = len(historical_facts)
logger.info(t("console.panoramaSearchComplete", active=result.active_count, historical=result.historical_count))
logger.info(f"PanoramaSearch完成: {result.active_count}条有效, {result.historical_count}条历史")
return result
def quick_search(
@ -1256,7 +1264,7 @@ class ZepToolsService:
Returns:
SearchResult: 搜索结果
"""
logger.info(t("console.quickSearchStart", query=query[:50]))
logger.info(f"QuickSearch 简单搜索: {query[:50]}...")
# 直接调用现有的search_graph方法
result = self.search_graph(
@ -1266,7 +1274,7 @@ class ZepToolsService:
scope="edges"
)
logger.info(t("console.quickSearchComplete", count=result.total_count))
logger.info(f"QuickSearch完成: {result.total_count}条结果")
return result
def interview_agents(
@ -1306,7 +1314,7 @@ class ZepToolsService:
"""
from .simulation_runner import SimulationRunner
logger.info(t("console.interviewAgentsStart", requirement=interview_requirement[:50]))
logger.info(f"InterviewAgents 深度采访真实API: {interview_requirement[:50]}...")
result = InterviewResult(
interview_topic=interview_requirement,
@ -1317,12 +1325,12 @@ class ZepToolsService:
profiles = self._load_agent_profiles(simulation_id)
if not profiles:
logger.warning(t("console.profilesNotFound", simId=simulation_id))
logger.warning(f"未找到模拟 {simulation_id} 的人设文件")
result.summary = "未找到可采访的Agent人设文件"
return result
result.total_agents = len(profiles)
logger.info(t("console.loadedProfiles", count=len(profiles)))
logger.info(f"加载到 {len(profiles)} 个Agent人设")
# Step 2: 使用LLM选择要采访的Agent返回agent_id列表
selected_agents, selected_indices, selection_reasoning = self._select_agents_for_interview(
@ -1334,7 +1342,7 @@ class ZepToolsService:
result.selected_agents = selected_agents
result.selection_reasoning = selection_reasoning
logger.info(t("console.selectedAgentsForInterview", count=len(selected_agents), indices=selected_indices))
logger.info(f"选择了 {len(selected_agents)} 个Agent进行采访: {selected_indices}")
# Step 3: 生成采访问题(如果没有提供)
if not result.interview_questions:
@ -1343,7 +1351,7 @@ class ZepToolsService:
simulation_requirement=simulation_requirement,
selected_agents=selected_agents
)
logger.info(t("console.generatedInterviewQuestions", count=len(result.interview_questions)))
logger.info(f"生成了 {len(result.interview_questions)} 个采访问题")
# 将问题合并为一个采访prompt
combined_prompt = "\n".join([f"{i+1}. {q}" for i, q in enumerate(result.interview_questions)])
@ -1373,7 +1381,7 @@ class ZepToolsService:
# 不指定platformAPI会在twitter和reddit两个平台都采访
})
logger.info(t("console.callingBatchInterviewApi", count=len(interviews_request)))
logger.info(f"调用批量采访API双平台: {len(interviews_request)} 个Agent")
# 调用 SimulationRunner 的批量采访方法不传platform双平台采访
api_result = SimulationRunner.interview_agents_batch(
@ -1383,12 +1391,12 @@ class ZepToolsService:
timeout=180.0 # 双平台需要更长超时
)
logger.info(t("console.interviewApiReturned", count=api_result.get('interviews_count', 0), success=api_result.get('success')))
logger.info(f"采访API返回: {api_result.get('interviews_count', 0)} 个结果, success={api_result.get('success')}")
# 检查API调用是否成功
if not api_result.get("success", False):
error_msg = api_result.get("error", "未知错误")
logger.warning(t("console.interviewApiReturnedFailure", error=error_msg))
logger.warning(f"采访API返回失败: {error_msg}")
result.summary = f"采访API调用失败{error_msg}。请检查OASIS模拟环境状态。"
return result
@ -1461,11 +1469,11 @@ class ZepToolsService:
except ValueError as e:
# 模拟环境未运行
logger.warning(t("console.interviewApiCallFailed", error=e))
logger.warning(f"采访API调用失败环境未运行: {e}")
result.summary = f"采访失败:{str(e)}。模拟环境可能已关闭请确保OASIS环境正在运行。"
return result
except Exception as e:
logger.error(t("console.interviewApiCallException", error=e))
logger.error(f"采访API调用异常: {e}")
import traceback
logger.error(traceback.format_exc())
result.summary = f"采访过程发生错误:{str(e)}"
@ -1478,7 +1486,7 @@ class ZepToolsService:
interview_requirement=interview_requirement
)
logger.info(t("console.interviewAgentsComplete", count=result.interviewed_count))
logger.info(f"InterviewAgents完成: 采访了 {result.interviewed_count} 个Agent双平台")
return result
@staticmethod
@ -1521,10 +1529,10 @@ class ZepToolsService:
try:
with open(reddit_profile_path, 'r', encoding='utf-8') as f:
profiles = json.load(f)
logger.info(t("console.loadedRedditProfiles", count=len(profiles)))
logger.info(f"从 reddit_profiles.json 加载了 {len(profiles)} 个人设")
return profiles
except Exception as e:
logger.warning(t("console.readRedditProfilesFailed", error=e))
logger.warning(f"读取 reddit_profiles.json 失败: {e}")
# 尝试读取Twitter CSV格式
twitter_profile_path = os.path.join(sim_dir, "twitter_profiles.csv")
@ -1541,10 +1549,10 @@ class ZepToolsService:
"persona": row.get("user_char", ""),
"profession": "未知"
})
logger.info(t("console.loadedTwitterProfiles", count=len(profiles)))
logger.info(f"从 twitter_profiles.csv 加载了 {len(profiles)} 个人设")
return profiles
except Exception as e:
logger.warning(t("console.readTwitterProfilesFailed", error=e))
logger.warning(f"读取 twitter_profiles.csv 失败: {e}")
return profiles
@ -1625,7 +1633,7 @@ class ZepToolsService:
return selected_agents, valid_indices, reasoning
except Exception as e:
logger.warning(t("console.llmSelectAgentFailed", error=e))
logger.warning(f"LLM选择Agent失败使用默认选择: {e}")
# 降级选择前N个
selected = profiles[:max_agents]
indices = list(range(min(max_agents, len(profiles))))
@ -1673,7 +1681,7 @@ class ZepToolsService:
return response.get("questions", [f"关于{interview_requirement},您有什么看法?"])
except Exception as e:
logger.warning(t("console.generateInterviewQuestionsFailed", error=e))
logger.warning(f"生成采访问题失败: {e}")
return [
f"关于{interview_requirement},您的观点是什么?",
"这件事对您或您所代表的群体有什么影响?",
@ -1695,8 +1703,7 @@ class ZepToolsService:
for interview in interviews:
interview_texts.append(f"{interview.agent_name}{interview.agent_role})】\n{interview.response[:500]}")
quote_instruction = "引用受访者原话时使用中文引号「」" if get_locale() == 'zh' else 'Use quotation marks "" when quoting interviewees'
system_prompt = f"""你是一个专业的新闻编辑。请根据多位受访者的回答,生成一份采访摘要。
system_prompt = """你是一个专业的新闻编辑。请根据多位受访者的回答,生成一份采访摘要。
摘要要求
1. 提炼各方主要观点
@ -1709,7 +1716,7 @@ class ZepToolsService:
- 使用纯文本段落用空行分隔不同部分
- 不要使用Markdown标题#、##、###
- 不要使用分割线---***
- {quote_instruction}
- 引用受访者原话时使用中文引号
- 可以使用**加粗**标记关键词但不要使用其他Markdown语法"""
user_prompt = f"""采访主题:{interview_requirement}
@ -1731,6 +1738,6 @@ class ZepToolsService:
return summary
except Exception as e:
logger.warning(t("console.generateInterviewSummaryFailed", error=e))
logger.warning(f"生成采访摘要失败: {e}")
# 降级:简单拼接
return f"共采访了{len(interviews)}位受访者,包括:" + "".join([i.agent_name for i in interviews])

View File

@ -5,7 +5,8 @@ LLM客户端封装
import json
import re
from typing import Optional, Dict, Any, List
from typing import Any, Dict, List, Optional
from openai import OpenAI
from ..config import Config
@ -13,41 +14,38 @@ from ..config import Config
class LLMClient:
"""LLM客户端"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: Optional[str] = None
model: Optional[str] = None,
):
self.api_key = api_key or Config.LLM_API_KEY
self.base_url = base_url or Config.LLM_BASE_URL
self.model = model or Config.LLM_MODEL_NAME
if not self.api_key:
raise ValueError("LLM_API_KEY 未配置")
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)
def chat(
self,
messages: List[Dict[str, str]],
temperature: float = 0.7,
max_tokens: int = 4096,
response_format: Optional[Dict] = None
response_format: Optional[Dict] = None,
) -> str:
"""
发送聊天请求
Args:
messages: 消息列表
temperature: 温度参数
max_tokens: 最大token数
response_format: 响应格式如JSON模式
Returns:
模型响应文本
"""
@ -57,47 +55,39 @@ class LLMClient:
"temperature": temperature,
"max_tokens": max_tokens,
}
if response_format:
kwargs["response_format"] = response_format
response = self.client.chat.completions.create(**kwargs)
content = response.choices[0].message.content
# 部分模型如MiniMax M2.5会在content中包含<think>思考内容,需要移除
content = re.sub(r'<think>[\s\S]*?</think>', '', content).strip()
content = re.sub(r"<think>[\s\S]*?</think>", "", content).strip()
return content
def chat_json(
self,
messages: List[Dict[str, str]],
temperature: float = 0.3,
max_tokens: int = 4096
) -> Dict[str, Any]:
"""
发送聊天请求并返回JSON
Args:
messages: 消息列表
temperature: 温度参数
max_tokens: 最大token数
Returns:
解析后的JSON对象
"""
response = self.chat(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
response_format={"type": "json_object"}
)
def chat_json(self, messages, temperature=0.3, max_tokens=4096):
try:
response = self.chat(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
response_format={"type": "json_object"},
)
except Exception:
# Retry without response_format for unsupported providers
response = self.chat(
messages=messages, temperature=temperature, max_tokens=max_tokens
)
# 清理markdown代码块标记
cleaned_response = response.strip()
cleaned_response = re.sub(r'^```(?:json)?\s*\n?', '', cleaned_response, flags=re.IGNORECASE)
cleaned_response = re.sub(r'\n?```\s*$', '', cleaned_response)
cleaned_response = re.sub(
r"^```(?:json)?\s*\n?", "", cleaned_response, flags=re.IGNORECASE
)
cleaned_response = re.sub(r"\n?```\s*$", "", cleaned_response)
cleaned_response = cleaned_response.strip()
try:
return json.loads(cleaned_response)
except json.JSONDecodeError:
raise ValueError(f"LLM返回的JSON格式无效: {cleaned_response}")

View File

@ -10,8 +10,7 @@ import time
from collections.abc import Callable
from typing import Any
from zep_cloud import InternalServerError
from zep_cloud.client import Zep
from typing import Any
from .logger import get_logger
@ -31,7 +30,7 @@ def _fetch_page_with_retry(
page_description: str = "page",
**kwargs: Any,
) -> list[Any]:
"""单页请求,失败时指数退避重试。仅重试网络/IO类瞬态错误"""
"""单页请求,失败时指数退避重试。自动处理429限速"""
if max_retries < 1:
raise ValueError("max_retries must be >= 1")
@ -41,13 +40,15 @@ def _fetch_page_with_retry(
for attempt in range(max_retries):
try:
return api_call(*args, **kwargs)
except (ConnectionError, TimeoutError, OSError, InternalServerError) as e:
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
# 检测429限速使用retry-after头部指定的等待时间
wait = delay
logger.warning(
f"Zep {page_description} attempt {attempt + 1} failed: {str(e)[:100]}, retrying in {delay:.1f}s..."
f"Zep {page_description} attempt {attempt + 1} failed: {str(e)[:100]}, retrying in {wait:.1f}s..."
)
time.sleep(delay)
time.sleep(wait)
delay *= 2
else:
logger.error(f"Zep {page_description} failed after {max_retries} attempts: {str(e)}")
@ -57,7 +58,7 @@ def _fetch_page_with_retry(
def fetch_all_nodes(
client: Zep,
client: Any,
graph_id: str,
page_size: int = _DEFAULT_PAGE_SIZE,
max_items: int = _MAX_NODES,
@ -103,7 +104,7 @@ def fetch_all_nodes(
def fetch_all_edges(
client: Zep,
client: Any,
graph_id: str,
page_size: int = _DEFAULT_PAGE_SIZE,
max_retries: int = _DEFAULT_MAX_RETRIES,

View File

@ -12,26 +12,22 @@ dependencies = [
# 核心框架
"flask>=3.0.0",
"flask-cors>=6.0.0",
# LLM 相关
"openai>=1.0.0",
# Zep Cloud
"zep-cloud==3.13.0",
# OASIS 社交媒体模拟
"camel-oasis==0.2.5",
"camel-ai==0.2.78",
# 文件处理
"PyMuPDF>=1.24.0",
# 编码检测支持非UTF-8编码的文本文件
"charset-normalizer>=3.0.0",
"chardet>=5.0.0",
# 工具库
"python-dotenv>=1.0.0",
"pydantic>=2.0.0",
"graphiti-core>=0.3",
"google-genai>=1.68.0",
]
[project.optional-dependencies]

View File

@ -475,6 +475,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/07/6c/aa3f2f849e01cb6a001cd8554a88d4c77c5c1a31c95bdf1cf9301e6d9ef4/defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61", size = 25604, upload-time = "2021-03-08T10:59:24.45Z" },
]
[[package]]
name = "diskcache"
version = "5.6.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/3f/21/1c1ffc1a039ddcc459db43cc108658f32c57d271d7289a2794e401d0fdb6/diskcache-5.6.3.tar.gz", hash = "sha256:2c3a3fa2743d8535d832ec61c2054a1641f41775aa7c556758a109941e33e4fc", size = 67916, upload-time = "2023-08-31T06:12:00.316Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3f/27/4570e78fc0bf5ea0ca45eb1de3818a23787af9b390c0b0a0033a1b8236f9/diskcache-5.6.3-py3-none-any.whl", hash = "sha256:5e31b2d5fbad117cc363ebaf6b689474db18a1f6438bc82358b024abd4c2ca19", size = 45550, upload-time = "2023-08-31T06:11:58.822Z" },
]
[[package]]
name = "distlib"
version = "0.4.0"
@ -592,6 +601,63 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/51/c7/b64cae5dba3a1b138d7123ec36bb5ccd39d39939f18454407e5468f4763f/fsspec-2025.12.0-py3-none-any.whl", hash = "sha256:8bf1fe301b7d8acfa6e8571e3b1c3d158f909666642431cc78a1b7b4dbc5ec5b", size = 201422, upload-time = "2025-12-03T15:23:41.434Z" },
]
[[package]]
name = "google-auth"
version = "2.49.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "cryptography" },
{ name = "pyasn1-modules" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ea/80/6a696a07d3d3b0a92488933532f03dbefa4a24ab80fb231395b9a2a1be77/google_auth-2.49.1.tar.gz", hash = "sha256:16d40da1c3c5a0533f57d268fe72e0ebb0ae1cc3b567024122651c045d879b64", size = 333825, upload-time = "2026-03-12T19:30:58.135Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e9/eb/c6c2478d8a8d633460be40e2a8a6f8f429171997a35a96f81d3b680dec83/google_auth-2.49.1-py3-none-any.whl", hash = "sha256:195ebe3dca18eddd1b3db5edc5189b76c13e96f29e73043b923ebcf3f1a860f7", size = 240737, upload-time = "2026-03-12T19:30:53.159Z" },
]
[package.optional-dependencies]
requests = [
{ name = "requests" },
]
[[package]]
name = "google-genai"
version = "1.68.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "distro" },
{ name = "google-auth", extra = ["requests"] },
{ name = "httpx" },
{ name = "pydantic" },
{ name = "requests" },
{ name = "sniffio" },
{ name = "tenacity" },
{ name = "typing-extensions" },
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9c/2c/f059982dbcb658cc535c81bbcbe7e2c040d675f4b563b03cdb01018a4bc3/google_genai-1.68.0.tar.gz", hash = "sha256:ac30c0b8bc630f9372993a97e4a11dae0e36f2e10d7c55eacdca95a9fa14ca96", size = 511285, upload-time = "2026-03-18T01:03:18.243Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/84/de/7d3ee9c94b74c3578ea4f88d45e8de9405902f857932334d81e89bce3dfa/google_genai-1.68.0-py3-none-any.whl", hash = "sha256:a1bc9919c0e2ea2907d1e319b65471d3d6d58c54822039a249fe1323e4178d15", size = 750912, upload-time = "2026-03-18T01:03:15.983Z" },
]
[[package]]
name = "graphiti-core"
version = "0.11.6"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "diskcache" },
{ name = "neo4j" },
{ name = "numpy" },
{ name = "openai" },
{ name = "pydantic" },
{ name = "python-dotenv" },
{ name = "tenacity" },
]
sdist = { url = "https://files.pythonhosted.org/packages/30/94/3f84400e5f02ea8e9dc79784202de4173cbc16f4b3ad1bd4302da888e4d8/graphiti_core-0.11.6.tar.gz", hash = "sha256:31d26621834d7d4b8865059ab749feb18af15937b59c69598a640a5dfabea331", size = 71928, upload-time = "2025-05-15T17:58:02.304Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ac/2e/c8f22f01585bf173d1c82f6d4615511aebc75aeda764c69aa394446fa93c/graphiti_core-0.11.6-py3-none-any.whl", hash = "sha256:6ec4807a884f5ea88b942d0c8b7bcd2e107c7358ab4f98ef2a2092c229929707", size = 111001, upload-time = "2025-05-15T17:58:00.542Z" },
]
[[package]]
name = "h11"
version = "0.16.0"
@ -1248,11 +1314,12 @@ dependencies = [
{ name = "charset-normalizer" },
{ name = "flask" },
{ name = "flask-cors" },
{ name = "google-genai" },
{ name = "graphiti-core" },
{ name = "openai" },
{ name = "pydantic" },
{ name = "pymupdf" },
{ name = "python-dotenv" },
{ name = "zep-cloud" },
]
[package.optional-dependencies]
@ -1276,6 +1343,8 @@ requires-dist = [
{ name = "charset-normalizer", specifier = ">=3.0.0" },
{ name = "flask", specifier = ">=3.0.0" },
{ name = "flask-cors", specifier = ">=6.0.0" },
{ name = "google-genai", specifier = ">=1.68.0" },
{ name = "graphiti-core", specifier = ">=0.3" },
{ name = "openai", specifier = ">=1.0.0" },
{ name = "pipreqs", marker = "extra == 'dev'", specifier = ">=0.5.0" },
{ name = "pydantic", specifier = ">=2.0.0" },
@ -1283,7 +1352,6 @@ requires-dist = [
{ name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0.0" },
{ name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23.0" },
{ name = "python-dotenv", specifier = ">=1.0.0" },
{ name = "zep-cloud", specifier = "==3.13.0" },
]
provides-extras = ["dev"]
@ -1916,6 +1984,27 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/8e/37/efad0257dc6e593a18957422533ff0f87ede7c9c6ea010a2177d738fb82f/pure_eval-0.2.3-py3-none-any.whl", hash = "sha256:1db8e35b67b3d218d818ae653e27f06c3aa420901fa7b081ca98cbedc874e0d0", size = 11842, upload-time = "2024-07-21T12:58:20.04Z" },
]
[[package]]
name = "pyasn1"
version = "0.6.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/5c/5f/6583902b6f79b399c9c40674ac384fd9cd77805f9e6205075f828ef11fb2/pyasn1-0.6.3.tar.gz", hash = "sha256:697a8ecd6d98891189184ca1fa05d1bb00e2f84b5977c481452050549c8a72cf", size = 148685, upload-time = "2026-03-17T01:06:53.382Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/5d/a0/7d793dce3fa811fe047d6ae2431c672364b462850c6235ae306c0efd025f/pyasn1-0.6.3-py3-none-any.whl", hash = "sha256:a80184d120f0864a52a073acc6fc642847d0be408e7c7252f31390c0f4eadcde", size = 83997, upload-time = "2026-03-17T01:06:52.036Z" },
]
[[package]]
name = "pyasn1-modules"
version = "0.4.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pyasn1" },
]
sdist = { url = "https://files.pythonhosted.org/packages/e9/e6/78ebbb10a8c8e4b61a59249394a4a594c1a7af95593dc933a349c8d00964/pyasn1_modules-0.4.2.tar.gz", hash = "sha256:677091de870a80aae844b1ca6134f54652fa2c8c5a52aa396440ac3106e941e6", size = 307892, upload-time = "2025-03-28T02:41:22.17Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/47/8d/d529b5d697919ba8c11ad626e835d4039be708a35b0d22de83a269a6682c/pyasn1_modules-0.4.2-py3-none-any.whl", hash = "sha256:29253a9207ce32b64c3ac6600edc75368f98473906e8fd1043bd6b5b1de2c14a", size = 181259, upload-time = "2025-03-28T02:41:19.028Z" },
]
[[package]]
name = "pycparser"
version = "2.23"
@ -2987,6 +3076,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/40/44/4a5f08c96eb108af5cb50b41f76142f0afa346dfa99d5296fe7202a11854/tabulate-0.9.0-py3-none-any.whl", hash = "sha256:024ca478df22e9340661486f85298cff5f6dcdba14f3813e8830015b9ed1948f", size = 35252, upload-time = "2022-10-06T17:21:44.262Z" },
]
[[package]]
name = "tenacity"
version = "9.1.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/47/c6/ee486fd809e357697ee8a44d3d69222b344920433d3b6666ccd9b374630c/tenacity-9.1.4.tar.gz", hash = "sha256:adb31d4c263f2bd041081ab33b498309a57c77f9acf2db65aadf0898179cf93a", size = 49413, upload-time = "2026-02-07T10:45:33.841Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d7/c1/eb8f9debc45d3b7918a32ab756658a0904732f75e555402972246b0b8e71/tenacity-9.1.4-py3-none-any.whl", hash = "sha256:6095a360c919085f28c6527de529e76a06ad89b23659fa881ae0649b867a9d55", size = 28926, upload-time = "2026-02-07T10:45:32.24Z" },
]
[[package]]
name = "texttable"
version = "1.7.0"
@ -3488,19 +3586,3 @@ sdist = { url = "https://files.pythonhosted.org/packages/d4/c8/cc640404a0981e6c1
wheels = [
{ url = "https://files.pythonhosted.org/packages/8b/90/89a2ff242ccab6a24fbab18dbbabc67c51a6f0ed01f9a0f41689dc177419/yarg-0.1.9-py2.py3-none-any.whl", hash = "sha256:4f9cebdc00fac946c9bf2783d634e538a71c7d280a4d806d45fd4dc0ef441492", size = 19162, upload-time = "2014-08-11T22:01:41.104Z" },
]
[[package]]
name = "zep-cloud"
version = "3.13.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpx" },
{ name = "pydantic" },
{ name = "pydantic-core" },
{ name = "python-dateutil" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/32/c7/c835debf13302f8aaf8d0561ac6ff5a9bc15cc140cd692a1330fb1900c55/zep_cloud-3.13.0.tar.gz", hash = "sha256:c55d9c511773bb2177ae8e08546141404f87d2099affafabd7ec4b4505763e48", size = 63116, upload-time = "2025-11-20T15:25:40.745Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f7/e1/bbf03c6c8007c0cb238780e7fc6d8e1a52633893933a41aa09678618985a/zep_cloud-3.13.0-py3-none-any.whl", hash = "sha256:b2fbdeef73e262194c8f67b58f76471de6ee87e1a629541a09d8f7bbf475f12b", size = 110601, upload-time = "2025-11-20T15:25:38.484Z" },
]

View File

@ -1435,7 +1435,6 @@
"resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz",
"integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==",
"license": "ISC",
"peer": true,
"engines": {
"node": ">=12"
}
@ -1913,7 +1912,6 @@
"integrity": "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A==",
"dev": true,
"license": "MIT",
"peer": true,
"engines": {
"node": ">=12"
},
@ -2053,7 +2051,6 @@
"integrity": "sha512-ITcnkFeR3+fI8P1wMgItjGrR10170d8auB4EpMLPqmx6uxElH3a/hHGQabSHKdqd4FXWO1nFIp9rRn7JQ34ACQ==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"esbuild": "^0.25.0",
"fdir": "^6.5.0",
@ -2128,7 +2125,6 @@
"resolved": "https://registry.npmjs.org/vue/-/vue-3.5.25.tgz",
"integrity": "sha512-YLVdgv2K13WJ6n+kD5owehKtEXwdwXuj2TTyJMsO7pSeKw2bfRNZGjhB7YzrpbMYj5b5QsUebHpOqR3R3ziy/g==",
"license": "MIT",
"peer": true,
"dependencies": {
"@vue/compiler-dom": "3.5.25",
"@vue/compiler-sfc": "3.5.25",