diff --git a/backend/app/__init__.py b/backend/app/__init__.py index 11857ef0..2d6519c2 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -1,12 +1,10 @@ -""" -MiroFish Backend - Flask应用工厂 -""" +"""MiroFish backend Flask application factory.""" import os import warnings -# 抑制 multiprocessing resource_tracker 的警告(来自第三方库如 transformers) -# 需要在所有其他导入之前设置 +# Silence multiprocessing.resource_tracker warnings emitted by some third-party +# libraries (e.g. transformers); must run before those modules are imported. warnings.filterwarnings("ignore", message=".*resource_tracker.*") from flask import Flask, request @@ -18,62 +16,65 @@ from .utils.locale import t def create_app(config_class=Config): - """Flask应用工厂函数""" + """Flask application factory.""" app = Flask(__name__) app.config.from_object(config_class) - - # 设置JSON编码:确保中文直接显示(而不是 \uXXXX 格式) - # Flask >= 2.3 使用 app.json.ensure_ascii,旧版本使用 JSON_AS_ASCII 配置 + + # Configure JSON encoding so non-ASCII characters render literally + # rather than as \uXXXX escape sequences. Flask >= 2.3 exposes + # ``app.json.ensure_ascii``; older versions use ``JSON_AS_ASCII``. if hasattr(app, 'json') and hasattr(app.json, 'ensure_ascii'): app.json.ensure_ascii = False - - # 设置日志 + + # Configure logging. logger = setup_logger('mirofish') - - # 只在 reloader 子进程中打印启动信息(避免 debug 模式下打印两次) + + # Only print startup banners in the reloader child process to avoid + # double-printing in debug mode. is_reloader_process = os.environ.get('WERKZEUG_RUN_MAIN') == 'true' debug_mode = app.config.get('DEBUG', False) should_log_startup = not debug_mode or is_reloader_process - + if should_log_startup: logger.info("=" * 50) logger.info(t("log.bootstrap.m001")) logger.info("=" * 50) - - # 启用CORS + + # Enable CORS. CORS(app, resources={r"/api/*": {"origins": "*"}}) - - # 注册模拟进程清理函数(确保服务器关闭时终止所有模拟进程) + + # Register simulation-process cleanup so all child processes are torn down + # when the Flask server shuts down. from .services.simulation_runner import SimulationRunner SimulationRunner.register_cleanup() if should_log_startup: logger.info(t("log.bootstrap.m002")) - - # 请求日志中间件 + + # Request-logging middleware. @app.before_request def log_request(): logger = get_logger('mirofish.request') logger.debug(t("log.bootstrap.m003", request=request.method, request_2=request.path)) if request.content_type and 'json' in request.content_type: logger.debug(t("log.bootstrap.m004", request=request.get_json(silent=True))) - + @app.after_request def log_response(response): logger = get_logger('mirofish.request') logger.debug(t("log.bootstrap.m005", response=response.status_code)) return response - - # 注册蓝图 + + # Register API blueprints. from .api import graph_bp, simulation_bp, report_bp app.register_blueprint(graph_bp, url_prefix='/api/graph') app.register_blueprint(simulation_bp, url_prefix='/api/simulation') app.register_blueprint(report_bp, url_prefix='/api/report') - - # 健康检查 + + # Health-check endpoint. @app.route('/health') def health(): return {'status': 'ok', 'service': 'MiroFish Backend'} - + # On startup: recover any projects stuck in graph_building (task was killed by restart) if should_log_startup: _recover_stuck_projects() diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py index ffda743a..4326e4da 100644 --- a/backend/app/api/__init__.py +++ b/backend/app/api/__init__.py @@ -1,6 +1,4 @@ -""" -API路由模块 -""" +"""API blueprints package.""" from flask import Blueprint diff --git a/backend/app/config.py b/backend/app/config.py index e6939c78..ab0867d3 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -1,38 +1,40 @@ -""" -配置管理 -统一从项目根目录的 .env 文件加载配置 +"""Configuration management. + +Loads configuration values from the project-root ``.env`` file. """ import os from dotenv import load_dotenv -# 加载项目根目录的 .env 文件 -# 路径: MiroFish/.env (相对于 backend/app/config.py) +# Load the project-root .env file. +# Path: MiroFish/.env (relative to backend/app/config.py). project_root_env = os.path.join(os.path.dirname(__file__), '../../.env') if os.path.exists(project_root_env): load_dotenv(project_root_env, override=True) else: - # 如果根目录没有 .env,尝试加载环境变量(用于生产环境) + # If the project root has no .env, fall back to the process environment + # (used in production deployments). load_dotenv(override=True) class Config: - """Flask配置类""" - - # Flask配置 + """Flask configuration class.""" + + # Flask settings. SECRET_KEY = os.environ.get('SECRET_KEY', 'mirofish-secret-key') DEBUG = os.environ.get('FLASK_DEBUG', 'True').lower() == 'true' - - # JSON配置 - 禁用ASCII转义,让中文直接显示(而不是 \uXXXX 格式) + + # JSON settings: disable ASCII escaping so non-ASCII output renders literally + # rather than as \uXXXX escape sequences. JSON_AS_ASCII = False - - # LLM配置(统一使用OpenAI格式) + + # LLM settings (called via the OpenAI-compatible API surface). LLM_API_KEY = os.environ.get('LLM_API_KEY') LLM_BASE_URL = os.environ.get('LLM_BASE_URL', 'https://api.openai.com/v1') LLM_MODEL_NAME = os.environ.get('LLM_MODEL_NAME', 'gpt-4o-mini') - - # Neo4j + Graphiti配置(替代 Zep Cloud) + + # Neo4j + Graphiti settings (replacement for Zep Cloud). NEO4J_URI = os.environ.get('NEO4J_URI', 'bolt://localhost:7687') NEO4J_USER = os.environ.get('NEO4J_USER', 'neo4j') NEO4J_PASSWORD = os.environ.get('NEO4J_PASSWORD', 'mirofish123') @@ -50,23 +52,23 @@ class Config: EMBEDDING_API_KEY = os.environ.get('EMBEDDING_API_KEY') EMBEDDING_BASE_URL = os.environ.get('EMBEDDING_BASE_URL') - # Zep配置(保留兼容性,已废弃) + # Zep settings (kept for backwards compatibility; deprecated). ZEP_API_KEY = os.environ.get('ZEP_API_KEY', '') - - # 文件上传配置 + + # File upload settings. MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50MB UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), '../uploads') ALLOWED_EXTENSIONS = {'pdf', 'md', 'txt', 'markdown'} - - # 文本处理配置 - DEFAULT_CHUNK_SIZE = 500 # 默认切块大小 - DEFAULT_CHUNK_OVERLAP = 50 # 默认重叠大小 - - # OASIS模拟配置 + + # Text processing settings. + DEFAULT_CHUNK_SIZE = 500 # default chunk size in characters + DEFAULT_CHUNK_OVERLAP = 50 # default overlap in characters + + # OASIS simulation settings. OASIS_DEFAULT_MAX_ROUNDS = int(os.environ.get('OASIS_DEFAULT_MAX_ROUNDS', '10')) OASIS_SIMULATION_DATA_DIR = os.path.join(os.path.dirname(__file__), '../uploads/simulations') - - # OASIS平台可用动作配置 + + # OASIS per-platform allowed action lists. OASIS_TWITTER_ACTIONS = [ 'CREATE_POST', 'LIKE_POST', 'REPOST', 'FOLLOW', 'DO_NOTHING', 'QUOTE_POST' ] @@ -76,14 +78,14 @@ class Config: 'TREND', 'REFRESH', 'DO_NOTHING', 'FOLLOW', 'MUTE' ] - # Report Agent配置 + # Report agent settings. REPORT_AGENT_MAX_TOOL_CALLS = int(os.environ.get('REPORT_AGENT_MAX_TOOL_CALLS', '5')) REPORT_AGENT_MAX_REFLECTION_ROUNDS = int(os.environ.get('REPORT_AGENT_MAX_REFLECTION_ROUNDS', '2')) REPORT_AGENT_TEMPERATURE = float(os.environ.get('REPORT_AGENT_TEMPERATURE', '0.5')) - + @classmethod def validate(cls): - """验证必要配置""" + """Validate that required configuration values are present.""" errors = [] if not cls.LLM_API_KEY: errors.append("LLM_API_KEY 未配置") diff --git a/backend/app/services/simulation_ipc.py b/backend/app/services/simulation_ipc.py index be2eac32..68428b8f 100644 --- a/backend/app/services/simulation_ipc.py +++ b/backend/app/services/simulation_ipc.py @@ -1,11 +1,12 @@ -""" -模拟IPC通信模块 -用于Flask后端和模拟脚本之间的进程间通信 +"""Simulation IPC module. -通过文件系统实现简单的命令/响应模式: -1. Flask写入命令到 commands/ 目录 -2. 模拟脚本轮询命令目录,执行命令并写入响应到 responses/ 目录 -3. Flask轮询响应目录获取结果 +Inter-process communication between the Flask backend and the simulation +subprocess. Implements a simple file-system command/response pattern: + +1. Flask writes commands into ``commands/``. +2. The simulation script polls for commands, executes them, and writes + responses into ``responses/``. +3. Flask polls the responses directory for results. """ import os @@ -24,14 +25,14 @@ logger = get_logger('mirofish.simulation_ipc') class CommandType(str, Enum): - """命令类型""" - INTERVIEW = "interview" # 单个Agent采访 - BATCH_INTERVIEW = "batch_interview" # 批量采访 - CLOSE_ENV = "close_env" # 关闭环境 + """IPC command types.""" + INTERVIEW = "interview" # interview a single agent + BATCH_INTERVIEW = "batch_interview" # interview multiple agents at once + CLOSE_ENV = "close_env" # tear down the environment class CommandStatus(str, Enum): - """命令状态""" + """IPC command status.""" PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" @@ -40,12 +41,12 @@ class CommandStatus(str, Enum): @dataclass class IPCCommand: - """IPC命令""" + """A command sent over the IPC channel.""" command_id: str command_type: CommandType args: Dict[str, Any] timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) - + def to_dict(self) -> Dict[str, Any]: return { "command_id": self.command_id, @@ -53,7 +54,7 @@ class IPCCommand: "args": self.args, "timestamp": self.timestamp } - + @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'IPCCommand': return cls( @@ -66,13 +67,13 @@ class IPCCommand: @dataclass class IPCResponse: - """IPC响应""" + """A response returned over the IPC channel.""" command_id: str status: CommandStatus result: Optional[Dict[str, Any]] = None error: Optional[str] = None timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) - + def to_dict(self) -> Dict[str, Any]: return { "command_id": self.command_id, @@ -81,7 +82,7 @@ class IPCResponse: "error": self.error, "timestamp": self.timestamp } - + @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'IPCResponse': return cls( @@ -94,27 +95,25 @@ class IPCResponse: class SimulationIPCClient: + """IPC client used by the Flask side. + + Sends commands to the simulation process and waits for responses. """ - 模拟IPC客户端(Flask端使用) - - 用于向模拟进程发送命令并等待响应 - """ - + def __init__(self, simulation_dir: str): - """ - 初始化IPC客户端 - + """Initialize the IPC client. + Args: - simulation_dir: 模拟数据目录 + simulation_dir: Directory holding the simulation's IPC files. """ self.simulation_dir = simulation_dir self.commands_dir = os.path.join(simulation_dir, "ipc_commands") self.responses_dir = os.path.join(simulation_dir, "ipc_responses") - - # 确保目录存在 + + # Ensure both directories exist before use. os.makedirs(self.commands_dir, exist_ok=True) os.makedirs(self.responses_dir, exist_ok=True) - + def send_command( self, command_type: CommandType, @@ -122,20 +121,19 @@ class SimulationIPCClient: timeout: float = 60.0, poll_interval: float = 0.5 ) -> IPCResponse: - """ - 发送命令并等待响应 - + """Send a command and wait for the response. + Args: - command_type: 命令类型 - args: 命令参数 - timeout: 超时时间(秒) - poll_interval: 轮询间隔(秒) - + command_type: Command type to send. + args: Command arguments. + timeout: Timeout in seconds. + poll_interval: Polling interval in seconds. + Returns: - IPCResponse - + The ``IPCResponse``. + Raises: - TimeoutError: 等待响应超时 + TimeoutError: When no response arrives before ``timeout``. """ command_id = str(uuid.uuid4()) command = IPCCommand( @@ -143,50 +141,50 @@ class SimulationIPCClient: command_type=command_type, args=args ) - - # 写入命令文件 + + # Write the command file. command_file = os.path.join(self.commands_dir, f"{command_id}.json") with open(command_file, 'w', encoding='utf-8') as f: json.dump(command.to_dict(), f, ensure_ascii=False, indent=2) - + logger.info(t("log.simulation_ipc.m001", command_type=command_type.value, command_id=command_id)) - - # 等待响应 + + # Poll for the response file. response_file = os.path.join(self.responses_dir, f"{command_id}.json") start_time = time.time() - + while time.time() - start_time < timeout: if os.path.exists(response_file): try: with open(response_file, 'r', encoding='utf-8') as f: response_data = json.load(f) response = IPCResponse.from_dict(response_data) - - # 清理命令和响应文件 + + # Clean up command and response files after successful read. try: os.remove(command_file) os.remove(response_file) except OSError: pass - + logger.info(t("log.simulation_ipc.m002", command_id=command_id, response=response.status.value)) return response except (json.JSONDecodeError, KeyError) as e: logger.warning(t("log.simulation_ipc.m003", e=e)) - + time.sleep(poll_interval) - - # 超时 + + # Timed out waiting for the response. logger.error(t("log.simulation_ipc.m004", command_id=command_id)) - - # 清理命令文件 + + # Clean up the unanswered command file. try: os.remove(command_file) except OSError: pass - + raise TimeoutError(f"等待命令响应超时 ({timeout}秒)") - + def send_interview( self, agent_id: int, @@ -194,20 +192,19 @@ class SimulationIPCClient: platform: str = None, timeout: float = 60.0 ) -> IPCResponse: - """ - 发送单个Agent采访命令 - + """Send a single-agent interview command. + Args: - agent_id: Agent ID - prompt: 采访问题 - platform: 指定平台(可选) - - "twitter": 只采访Twitter平台 - - "reddit": 只采访Reddit平台 - - None: 双平台模拟时同时采访两个平台,单平台模拟时采访该平台 - timeout: 超时时间 - + agent_id: Agent id to interview. + prompt: Interview question. + platform: Optional platform selector. + - ``"twitter"``: interview only on Twitter. + - ``"reddit"``: interview only on Reddit. + - ``None``: dual-platform if applicable, else the single active platform. + timeout: Timeout in seconds. + Returns: - IPCResponse,result字段包含采访结果 + ``IPCResponse`` whose ``result`` carries the interview response. """ args = { "agent_id": agent_id, @@ -215,69 +212,66 @@ class SimulationIPCClient: } if platform: args["platform"] = platform - + return self.send_command( command_type=CommandType.INTERVIEW, args=args, timeout=timeout ) - + def send_batch_interview( self, interviews: List[Dict[str, Any]], platform: str = None, timeout: float = 120.0 ) -> IPCResponse: - """ - 发送批量采访命令 - + """Send a batched interview command. + Args: - interviews: 采访列表,每个元素包含 {"agent_id": int, "prompt": str, "platform": str(可选)} - platform: 默认平台(可选,会被每个采访项的platform覆盖) - - "twitter": 默认只采访Twitter平台 - - "reddit": 默认只采访Reddit平台 - - None: 双平台模拟时每个Agent同时采访两个平台 - timeout: 超时时间 - + interviews: List of items shaped ``{"agent_id": int, "prompt": str, "platform": str?}``. + platform: Default platform; per-item ``platform`` overrides this. + - ``"twitter"``: default to Twitter. + - ``"reddit"``: default to Reddit. + - ``None``: dual-platform interview when applicable. + timeout: Timeout in seconds. + Returns: - IPCResponse,result字段包含所有采访结果 + ``IPCResponse`` whose ``result`` carries every interview response. """ args = {"interviews": interviews} if platform: args["platform"] = platform - + return self.send_command( command_type=CommandType.BATCH_INTERVIEW, args=args, timeout=timeout ) - + def send_close_env(self, timeout: float = 30.0) -> IPCResponse: - """ - 发送关闭环境命令 - + """Send a tear-down-environment command. + Args: - timeout: 超时时间 - + timeout: Timeout in seconds. + Returns: - IPCResponse + ``IPCResponse``. """ return self.send_command( command_type=CommandType.CLOSE_ENV, args={}, timeout=timeout ) - + def check_env_alive(self) -> bool: - """ - 检查模拟环境是否存活 - - 通过检查 env_status.json 文件来判断 + """Return ``True`` if the simulation environment reports as alive. + + Reads ``env_status.json`` written by the IPC server side. """ status_file = os.path.join(self.simulation_dir, "env_status.json") if not os.path.exists(status_file): return False - + try: with open(status_file, 'r', encoding='utf-8') as f: status = json.load(f) @@ -287,68 +281,65 @@ class SimulationIPCClient: class SimulationIPCServer: + """IPC server used by the simulation script. + + Polls the commands directory, executes commands, and writes responses. """ - 模拟IPC服务器(模拟脚本端使用) - - 轮询命令目录,执行命令并返回响应 - """ - + def __init__(self, simulation_dir: str): - """ - 初始化IPC服务器 - + """Initialize the IPC server. + Args: - simulation_dir: 模拟数据目录 + simulation_dir: Directory holding the simulation's IPC files. """ self.simulation_dir = simulation_dir self.commands_dir = os.path.join(simulation_dir, "ipc_commands") self.responses_dir = os.path.join(simulation_dir, "ipc_responses") - - # 确保目录存在 + + # Ensure both directories exist before use. os.makedirs(self.commands_dir, exist_ok=True) os.makedirs(self.responses_dir, exist_ok=True) - - # 环境状态 + + # Server-running flag. self._running = False - + def start(self): - """标记服务器为运行状态""" + """Mark the server as alive and persist the state.""" self._running = True self._update_env_status("alive") - + def stop(self): - """标记服务器为停止状态""" + """Mark the server as stopped and persist the state.""" self._running = False self._update_env_status("stopped") - + def _update_env_status(self, status: str): - """更新环境状态文件""" + """Update the persistent environment-status file.""" status_file = os.path.join(self.simulation_dir, "env_status.json") with open(status_file, 'w', encoding='utf-8') as f: json.dump({ "status": status, "timestamp": datetime.now().isoformat() }, f, ensure_ascii=False, indent=2) - + def poll_commands(self) -> Optional[IPCCommand]: - """ - 轮询命令目录,返回第一个待处理的命令 - + """Poll the commands directory and return the next pending command. + Returns: - IPCCommand 或 None + ``IPCCommand`` or ``None`` if no pending commands remain. """ if not os.path.exists(self.commands_dir): return None - - # 按时间排序获取命令文件 + + # Sort by mtime so we process commands in arrival order. command_files = [] for filename in os.listdir(self.commands_dir): if filename.endswith('.json'): filepath = os.path.join(self.commands_dir, filename) command_files.append((filepath, os.path.getmtime(filepath))) - + command_files.sort(key=lambda x: x[1]) - + for filepath, _ in command_files: try: with open(filepath, 'r', encoding='utf-8') as f: @@ -357,37 +348,36 @@ class SimulationIPCServer: except (json.JSONDecodeError, KeyError, OSError) as e: logger.warning(t("log.simulation_ipc.m005", filepath=filepath, e=e)) continue - + return None - + def send_response(self, response: IPCResponse): - """ - 发送响应 - + """Write a response file. + Args: - response: IPC响应 + response: The response to send. """ response_file = os.path.join(self.responses_dir, f"{response.command_id}.json") with open(response_file, 'w', encoding='utf-8') as f: json.dump(response.to_dict(), f, ensure_ascii=False, indent=2) - - # 删除命令文件 + + # Delete the matching command file. command_file = os.path.join(self.commands_dir, f"{response.command_id}.json") try: os.remove(command_file) except OSError: pass - + def send_success(self, command_id: str, result: Dict[str, Any]): - """发送成功响应""" + """Send a success response.""" self.send_response(IPCResponse( command_id=command_id, status=CommandStatus.COMPLETED, result=result )) - + def send_error(self, command_id: str, error: str): - """发送错误响应""" + """Send a failure response.""" self.send_response(IPCResponse( command_id=command_id, status=CommandStatus.FAILED, diff --git a/backend/app/services/simulation_manager.py b/backend/app/services/simulation_manager.py index 2f297e2c..b1af480f 100644 --- a/backend/app/services/simulation_manager.py +++ b/backend/app/services/simulation_manager.py @@ -1,7 +1,7 @@ -""" -OASIS模拟管理器 -管理Twitter和Reddit双平台并行模拟 -使用预设脚本 + LLM智能生成配置参数 +"""OASIS simulation manager. + +Drives parallel Twitter + Reddit simulations using preset scripts plus +LLM-generated configuration parameters. """ import os @@ -23,60 +23,60 @@ logger = get_logger('mirofish.simulation') class SimulationStatus(str, Enum): - """模拟状态""" + """Simulation lifecycle status.""" CREATED = "created" PREPARING = "preparing" READY = "ready" RUNNING = "running" PAUSED = "paused" - STOPPED = "stopped" # 模拟被手动停止 - COMPLETED = "completed" # 模拟自然完成 + STOPPED = "stopped" # manually stopped + COMPLETED = "completed" # finished naturally FAILED = "failed" class PlatformType(str, Enum): - """平台类型""" + """Simulated platform types.""" TWITTER = "twitter" REDDIT = "reddit" @dataclass class SimulationState: - """模拟状态""" + """In-memory + persisted state for a single simulation.""" simulation_id: str project_id: str graph_id: str - - # 平台启用状态 + + # Per-platform enable flags. enable_twitter: bool = True enable_reddit: bool = True - - # 状态 + + # Lifecycle status. status: SimulationStatus = SimulationStatus.CREATED - - # 准备阶段数据 + + # Counters captured during the prepare phase. entities_count: int = 0 profiles_count: int = 0 entity_types: List[str] = field(default_factory=list) - - # 配置生成信息 + + # Information about the auto-generated config. config_generated: bool = False config_reasoning: str = "" - - # 运行时数据 + + # Runtime data. current_round: int = 0 twitter_status: str = "not_started" reddit_status: str = "not_started" - - # 时间戳 + + # Timestamps. created_at: str = field(default_factory=lambda: datetime.now().isoformat()) updated_at: str = field(default_factory=lambda: datetime.now().isoformat()) - - # 错误信息 + + # Error message when status == FAILED. error: Optional[str] = None - + def to_dict(self) -> Dict[str, Any]: - """完整状态字典(内部使用)""" + """Full state dict (used for persistence and internal callers).""" return { "simulation_id": self.simulation_id, "project_id": self.project_id, @@ -96,9 +96,9 @@ class SimulationState: "updated_at": self.updated_at, "error": self.error, } - + def to_simple_dict(self) -> Dict[str, Any]: - """简化状态字典(API返回使用)""" + """Simplified state dict (used for API responses).""" return { "simulation_id": self.simulation_id, "project_id": self.project_id, @@ -113,61 +113,60 @@ class SimulationState: class SimulationManager: + """Simulation manager. + + Core responsibilities: + 1. Read entities from the Zep graph and filter to the configured types. + 2. Generate OASIS agent profiles per entity. + 3. Use the LLM to generate simulation configuration parameters. + 4. Materialize the files the preset scripts expect. """ - 模拟管理器 - - 核心功能: - 1. 从Zep图谱读取实体并过滤 - 2. 生成OASIS Agent Profile - 3. 使用LLM智能生成模拟配置参数 - 4. 准备预设脚本所需的所有文件 - """ - - # 模拟数据存储目录 + + # Root directory for persisted simulation data. SIMULATION_DATA_DIR = os.path.join( - os.path.dirname(__file__), + os.path.dirname(__file__), '../../uploads/simulations' ) - + def __init__(self): - # 确保目录存在 + # Ensure the simulation data directory exists. os.makedirs(self.SIMULATION_DATA_DIR, exist_ok=True) - - # 内存中的模拟状态缓存 + + # In-memory cache of simulation state objects. self._simulations: Dict[str, SimulationState] = {} - + def _get_simulation_dir(self, simulation_id: str) -> str: - """获取模拟数据目录""" + """Return the on-disk directory for a simulation, creating if missing.""" sim_dir = os.path.join(self.SIMULATION_DATA_DIR, simulation_id) os.makedirs(sim_dir, exist_ok=True) return sim_dir - + def _save_simulation_state(self, state: SimulationState): - """保存模拟状态到文件""" + """Persist a simulation state to disk and update the cache.""" sim_dir = self._get_simulation_dir(state.simulation_id) state_file = os.path.join(sim_dir, "state.json") - + state.updated_at = datetime.now().isoformat() - + with open(state_file, 'w', encoding='utf-8') as f: json.dump(state.to_dict(), f, ensure_ascii=False, indent=2) - + self._simulations[state.simulation_id] = state - + def _load_simulation_state(self, simulation_id: str) -> Optional[SimulationState]: - """从文件加载模拟状态""" + """Load a simulation state from disk (or cache) by id.""" if simulation_id in self._simulations: return self._simulations[simulation_id] - + sim_dir = self._get_simulation_dir(simulation_id) state_file = os.path.join(sim_dir, "state.json") - + if not os.path.exists(state_file): return None - + with open(state_file, 'r', encoding='utf-8') as f: data = json.load(f) - + state = SimulationState( simulation_id=simulation_id, project_id=data.get("project_id", ""), @@ -187,10 +186,10 @@ class SimulationManager: updated_at=data.get("updated_at", datetime.now().isoformat()), error=data.get("error"), ) - + self._simulations[simulation_id] = state return state - + def create_simulation( self, project_id: str, @@ -198,21 +197,20 @@ class SimulationManager: enable_twitter: bool = True, enable_reddit: bool = True, ) -> SimulationState: - """ - 创建新的模拟 - + """Create a new simulation in the ``CREATED`` state. + Args: - project_id: 项目ID - graph_id: Zep图谱ID - enable_twitter: 是否启用Twitter模拟 - enable_reddit: 是否启用Reddit模拟 - + project_id: Owning project id. + graph_id: Source Zep graph id. + enable_twitter: When ``True``, the Twitter simulation runs. + enable_reddit: When ``True``, the Reddit simulation runs. + Returns: - SimulationState + The created ``SimulationState``. """ import uuid simulation_id = f"sim_{uuid.uuid4().hex[:12]}" - + state = SimulationState( simulation_id=simulation_id, project_id=project_id, @@ -221,12 +219,12 @@ class SimulationManager: enable_reddit=enable_reddit, status=SimulationStatus.CREATED, ) - + self._save_simulation_state(state) logger.info(t("log.simulation_manager.m001", simulation_id=simulation_id, project_id=project_id, graph_id=graph_id)) - + return state - + def prepare_simulation( self, simulation_id: str, @@ -237,56 +235,55 @@ class SimulationManager: progress_callback: Optional[callable] = None, parallel_profile_count: int = 3 ) -> SimulationState: - """ - 准备模拟环境(全程自动化) - - 步骤: - 1. 从Zep图谱读取并过滤实体 - 2. 为每个实体生成OASIS Agent Profile(可选LLM增强,支持并行) - 3. 使用LLM智能生成模拟配置参数(时间、活跃度、发言频率等) - 4. 保存配置文件和Profile文件 - 5. 复制预设脚本到模拟目录 - + """Prepare the simulation environment end-to-end. + + Steps: + 1. Read and filter entities from the graph. + 2. Generate OASIS agent profiles (optional LLM enrichment, parallel-capable). + 3. Use the LLM to produce simulation parameters (timing, activity, posting frequency). + 4. Save the configuration and profile files. + 5. Copy preset scripts into the simulation directory. + Args: - simulation_id: 模拟ID - simulation_requirement: 模拟需求描述(用于LLM生成配置) - document_text: 原始文档内容(用于LLM理解背景) - defined_entity_types: 预定义的实体类型(可选) - use_llm_for_profiles: 是否使用LLM生成详细人设 - progress_callback: 进度回调函数 (stage, progress, message) - parallel_profile_count: 并行生成人设的数量,默认3 - + simulation_id: Simulation id. + simulation_requirement: Free-text description of the simulation goal. + document_text: Raw source document text passed to the LLM for context. + defined_entity_types: Optional list of allowed entity types. + use_llm_for_profiles: When ``True``, enrich profiles via the LLM. + progress_callback: Optional callback ``(stage, progress, message, **extras)``. + parallel_profile_count: Number of profile generations to run in parallel. + Returns: - SimulationState + The updated ``SimulationState``. """ state = self._load_simulation_state(simulation_id) if not state: raise ValueError(f"模拟不存在: {simulation_id}") - + try: state.status = SimulationStatus.PREPARING self._save_simulation_state(state) - + sim_dir = self._get_simulation_dir(simulation_id) - - # ========== 阶段1: 读取并过滤实体 ========== + + # ========== Stage 1: read and filter entities ========== if progress_callback: progress_callback("reading", 0, t('progress.connectingZepGraph')) - + reader = ZepEntityReader() - + if progress_callback: progress_callback("reading", 30, t('progress.readingNodeData')) - + filtered = reader.filter_defined_entities( graph_id=state.graph_id, defined_entity_types=defined_entity_types, enrich_with_edges=True ) - + state.entities_count = filtered.filtered_count state.entity_types = list(filtered.entity_types) - + if progress_callback: progress_callback( "reading", 100, @@ -294,16 +291,16 @@ class SimulationManager: current=filtered.filtered_count, total=filtered.filtered_count ) - + if filtered.filtered_count == 0: state.status = SimulationStatus.FAILED state.error = "没有找到符合条件的实体,请检查图谱是否正确构建" self._save_simulation_state(state) return state - - # ========== 阶段2: 生成Agent Profile ========== + + # ========== Stage 2: generate agent profiles ========== total_entities = len(filtered.entities) - + if progress_callback: progress_callback( "generating_profiles", 0, @@ -311,22 +308,22 @@ class SimulationManager: current=0, total=total_entities ) - - # 传入graph_id以启用Zep检索功能,获取更丰富的上下文 + + # Pass the graph_id so the generator can use Zep retrieval for richer context. generator = OasisProfileGenerator(graph_id=state.graph_id) - + def profile_progress(current, total, msg): if progress_callback: progress_callback( - "generating_profiles", - int(current / total * 100), + "generating_profiles", + int(current / total * 100), msg, current=current, total=total, item_name=msg ) - - # 设置实时保存的文件路径(优先使用 Reddit JSON 格式) + + # Configure the realtime save target (prefer Reddit JSON if Reddit is enabled). realtime_output_path = None realtime_platform = "reddit" if state.enable_reddit: @@ -335,21 +332,21 @@ class SimulationManager: elif state.enable_twitter: realtime_output_path = os.path.join(sim_dir, "twitter_profiles.csv") realtime_platform = "twitter" - + profiles = generator.generate_profiles_from_entities( entities=filtered.entities, use_llm=use_llm_for_profiles, progress_callback=profile_progress, - graph_id=state.graph_id, # 传入graph_id用于Zep检索 - parallel_count=parallel_profile_count, # 并行生成数量 - realtime_output_path=realtime_output_path, # 实时保存路径 - output_platform=realtime_platform # 输出格式 + graph_id=state.graph_id, # used for Zep retrieval enrichment + parallel_count=parallel_profile_count, + realtime_output_path=realtime_output_path, + output_platform=realtime_platform ) - + state.profiles_count = len(profiles) - - # 保存Profile文件(注意:Twitter使用CSV格式,Reddit使用JSON格式) - # Reddit 已经在生成过程中实时保存了,这里再保存一次确保完整性 + + # Save profile files. Reddit also writes JSON during generation; this is + # a final consistency write. Twitter requires CSV per OASIS conventions. if progress_callback: progress_callback( "generating_profiles", 95, @@ -357,22 +354,22 @@ class SimulationManager: current=total_entities, total=total_entities ) - + if state.enable_reddit: generator.save_profiles( profiles=profiles, file_path=os.path.join(sim_dir, "reddit_profiles.json"), platform="reddit" ) - + if state.enable_twitter: - # Twitter使用CSV格式!这是OASIS的要求 + # Twitter uses CSV format — required by OASIS. generator.save_profiles( profiles=profiles, file_path=os.path.join(sim_dir, "twitter_profiles.csv"), platform="twitter" ) - + if progress_callback: progress_callback( "generating_profiles", 100, @@ -380,8 +377,8 @@ class SimulationManager: current=len(profiles), total=len(profiles) ) - - # ========== 阶段3: LLM智能生成模拟配置 ========== + + # ========== Stage 3: LLM-driven simulation config ========== if progress_callback: progress_callback( "generating_config", 0, @@ -389,9 +386,9 @@ class SimulationManager: current=0, total=3 ) - + config_generator = SimulationConfigGenerator() - + if progress_callback: progress_callback( "generating_config", 30, @@ -399,7 +396,7 @@ class SimulationManager: current=1, total=3 ) - + sim_params = config_generator.generate_config( simulation_id=simulation_id, project_id=state.project_id, @@ -410,7 +407,7 @@ class SimulationManager: enable_twitter=state.enable_twitter, enable_reddit=state.enable_reddit ) - + if progress_callback: progress_callback( "generating_config", 70, @@ -418,15 +415,15 @@ class SimulationManager: current=2, total=3 ) - - # 保存配置文件 + + # Save the configuration file. config_path = os.path.join(sim_dir, "simulation_config.json") with open(config_path, 'w', encoding='utf-8') as f: f.write(sim_params.to_json()) - + state.config_generated = True state.config_reasoning = sim_params.generation_reasoning - + if progress_callback: progress_callback( "generating_config", 100, @@ -434,18 +431,17 @@ class SimulationManager: current=3, total=3 ) - - # 注意:运行脚本保留在 backend/scripts/ 目录,不再复制到模拟目录 - # 启动模拟时,simulation_runner 会从 scripts/ 目录运行脚本 - - # 更新状态 + + # The runtime scripts now live under backend/scripts/; we no longer copy + # them per-simulation. simulation_runner invokes them in place. + state.status = SimulationStatus.READY self._save_simulation_state(state) - + logger.info(t("log.simulation_manager.m002", simulation_id=simulation_id, state=state.entities_count, state_2=state.profiles_count)) - + return state - + except Exception as e: logger.error(t("log.simulation_manager.m003", simulation_id=simulation_id, str=str(e))) import traceback @@ -454,61 +450,61 @@ class SimulationManager: state.error = str(e) self._save_simulation_state(state) raise - + def get_simulation(self, simulation_id: str) -> Optional[SimulationState]: - """获取模拟状态""" + """Return the simulation's state, or ``None`` if unknown.""" return self._load_simulation_state(simulation_id) - + def list_simulations(self, project_id: Optional[str] = None) -> List[SimulationState]: - """列出所有模拟""" + """List all simulations, optionally filtered by ``project_id``.""" simulations = [] - + if os.path.exists(self.SIMULATION_DATA_DIR): for sim_id in os.listdir(self.SIMULATION_DATA_DIR): - # 跳过隐藏文件(如 .DS_Store)和非目录文件 + # Skip dotfiles (e.g. .DS_Store) and non-directories. sim_path = os.path.join(self.SIMULATION_DATA_DIR, sim_id) if sim_id.startswith('.') or not os.path.isdir(sim_path): continue - + state = self._load_simulation_state(sim_id) if state: if project_id is None or state.project_id == project_id: simulations.append(state) - + return simulations - + def get_profiles(self, simulation_id: str, platform: str = "reddit") -> List[Dict[str, Any]]: - """获取模拟的Agent Profile""" + """Return the persisted agent profiles for a platform.""" state = self._load_simulation_state(simulation_id) if not state: raise ValueError(f"模拟不存在: {simulation_id}") - + sim_dir = self._get_simulation_dir(simulation_id) profile_path = os.path.join(sim_dir, f"{platform}_profiles.json") - + if not os.path.exists(profile_path): return [] - + with open(profile_path, 'r', encoding='utf-8') as f: return json.load(f) - + def get_simulation_config(self, simulation_id: str) -> Optional[Dict[str, Any]]: - """获取模拟配置""" + """Return the persisted simulation config dict, or ``None`` if absent.""" sim_dir = self._get_simulation_dir(simulation_id) config_path = os.path.join(sim_dir, "simulation_config.json") - + if not os.path.exists(config_path): return None - + with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) - + def get_run_instructions(self, simulation_id: str) -> Dict[str, str]: - """获取运行说明""" + """Return shell commands and instructions to launch the simulation manually.""" sim_dir = self._get_simulation_dir(simulation_id) config_path = os.path.join(sim_dir, "simulation_config.json") scripts_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../scripts')) - + return { "simulation_dir": sim_dir, "scripts_dir": scripts_dir, diff --git a/backend/app/services/zep_entity_reader.py b/backend/app/services/zep_entity_reader.py index 905468ac..ca1dd0c5 100644 --- a/backend/app/services/zep_entity_reader.py +++ b/backend/app/services/zep_entity_reader.py @@ -1,6 +1,7 @@ -""" -Zep实体读取与过滤服务 -从Zep图谱中读取节点,筛选出符合预定义实体类型的节点 +"""Zep entity reader and filter service. + +Reads nodes from a Zep graph and filters down to those that match a +predefined ontology of entity types. """ import time @@ -16,23 +17,23 @@ from ..utils.locale import t logger = get_logger('mirofish.zep_entity_reader') -# 用于泛型返回类型 +# Generic return-type variable. T = TypeVar('T') @dataclass class EntityNode: - """实体节点数据结构""" + """In-memory representation of an entity node from the graph.""" uuid: str name: str labels: List[str] summary: str attributes: Dict[str, Any] - # 相关的边信息 + # Edges connected to this entity. related_edges: List[Dict[str, Any]] = field(default_factory=list) - # 相关的其他节点信息 + # Other nodes connected through related edges. related_nodes: List[Dict[str, Any]] = field(default_factory=list) - + def to_dict(self) -> Dict[str, Any]: return { "uuid": self.uuid, @@ -43,9 +44,9 @@ class EntityNode: "related_edges": self.related_edges, "related_nodes": self.related_nodes, } - + def get_entity_type(self) -> Optional[str]: - """获取实体类型(排除默认的Entity标签)""" + """Return the first non-default label, or ``None`` if only defaults are present.""" for label in self.labels: if label not in ["Entity", "Node"]: return label @@ -54,12 +55,12 @@ class EntityNode: @dataclass class FilteredEntities: - """过滤后的实体集合""" + """Result of a filter pass over the graph: matching entities + counts.""" entities: List[EntityNode] entity_types: Set[str] total_count: int filtered_count: int - + def to_dict(self) -> Dict[str, Any]: return { "entities": [e.to_dict() for e in self.entities], @@ -70,40 +71,38 @@ class FilteredEntities: class ZepEntityReader: + """Read entities from a Zep graph and filter to ontology-defined types. + + Capabilities: + 1. Read all nodes from the graph. + 2. Keep nodes whose labels include something other than the default ``Entity``. + 3. Optionally enrich each entity with its connected edges and neighboring nodes. """ - Zep实体读取与过滤服务 - - 主要功能: - 1. 从Zep图谱读取所有节点 - 2. 筛选出符合预定义实体类型的节点(Labels不只是Entity的节点) - 3. 获取每个实体的相关边和关联节点信息 - """ - + def __init__(self, api_key: Optional[str] = None): self.client = GraphitiAdapter() - + def _call_with_retry( - self, - func: Callable[[], T], + self, + func: Callable[[], T], operation_name: str, max_retries: int = 3, initial_delay: float = 2.0 ) -> T: - """ - 带重试机制的Zep API调用 - + """Call a Zep API function with retry on failure. + Args: - func: 要执行的函数(无参数的lambda或callable) - operation_name: 操作名称,用于日志 - max_retries: 最大重试次数(默认3次,即最多尝试3次) - initial_delay: 初始延迟秒数 - + func: A zero-argument callable performing the request. + operation_name: Operation label used in log output. + max_retries: Maximum number of attempts (default 3 — i.e. up to 3 tries total). + initial_delay: Initial delay between retries in seconds. + Returns: - API调用结果 + The return value of ``func``. """ last_exception = None delay = initial_delay - + for attempt in range(max_retries): try: return func() @@ -114,21 +113,20 @@ class ZepEntityReader: t("log.zep_entity_reader.m001", operation_name=operation_name, attempt=attempt + 1, str=str(e)[:100], delay=delay) ) time.sleep(delay) - delay *= 2 # 指数退避 + delay *= 2 # exponential backoff else: logger.error(t("log.zep_entity_reader.m002", operation_name=operation_name, max_retries=max_retries, str=str(e))) - + raise last_exception - + def get_all_nodes(self, graph_id: str) -> List[Dict[str, Any]]: - """ - 获取图谱的所有节点(分页获取) + """Return every node in the graph (paginated under the hood). Args: - graph_id: 图谱ID + graph_id: Graph identifier. Returns: - 节点列表 + A list of node dicts. """ logger.info(t("log.zep_entity_reader.m003", graph_id=graph_id)) @@ -148,14 +146,13 @@ class ZepEntityReader: return nodes_data def get_all_edges(self, graph_id: str) -> List[Dict[str, Any]]: - """ - 获取图谱的所有边(分页获取) + """Return every edge in the graph (paginated under the hood). Args: - graph_id: 图谱ID + graph_id: Graph identifier. Returns: - 边列表 + A list of edge dicts. """ logger.info(t("log.zep_entity_reader.m005", graph_id=graph_id)) @@ -174,24 +171,23 @@ class ZepEntityReader: logger.info(t("log.zep_entity_reader.m006", len=len(edges_data))) return edges_data - + def get_node_edges(self, node_uuid: str) -> List[Dict[str, Any]]: - """ - 获取指定节点的所有相关边(带重试机制) - + """Return every edge connected to the given node (with retry). + Args: - node_uuid: 节点UUID - + node_uuid: Node UUID. + Returns: - 边列表 + A list of edge dicts. """ try: - # 使用重试机制调用Zep API + # Wrap the API call in retry logic. edges = self._call_with_retry( func=lambda: self.client.graph.node.get_entity_edges(node_uuid=node_uuid), operation_name=f"获取节点边(node={node_uuid[:8]}...)" ) - + edges_data = [] for edge in edges: edges_data.append({ @@ -202,32 +198,31 @@ class ZepEntityReader: "target_node_uuid": edge.target_node_uuid, "attributes": edge.attributes or {}, }) - + return edges_data except Exception as e: logger.warning(t("log.zep_entity_reader.m007", node_uuid=node_uuid, str=str(e))) return [] - + def filter_defined_entities( - self, + self, graph_id: str, defined_entity_types: Optional[List[str]] = None, enrich_with_edges: bool = True ) -> FilteredEntities: - """ - 筛选出符合预定义实体类型的节点 - - 筛选逻辑: - - 如果节点的Labels只有一个"Entity",说明这个实体不符合我们预定义的类型,跳过 - - 如果节点的Labels包含除"Entity"和"Node"之外的标签,说明符合预定义类型,保留 - + """Filter nodes down to entities matching the predefined ontology types. + + Filtering rules: + - Skip nodes whose only label is ``Entity`` (uncategorized). + - Keep nodes whose labels include anything other than ``Entity`` and ``Node``. + Args: - graph_id: 图谱ID - defined_entity_types: 预定义的实体类型列表(可选,如果提供则只保留这些类型) - enrich_with_edges: 是否获取每个实体的相关边信息 - + graph_id: Graph identifier. + defined_entity_types: Optional allow-list; when provided, only matching types are kept. + enrich_with_edges: When ``True``, populate related_edges and related_nodes. + Returns: - FilteredEntities: 过滤后的实体集合 + A ``FilteredEntities`` summary. """ logger.info(t("log.zep_entity_reader.m008", graph_id=graph_id)) @@ -243,7 +238,7 @@ class ZepEntityReader: except Exception: pass - # 获取所有节点 + # Read every node from the graph. all_nodes = self.get_all_nodes(graph_id) total_count = len(all_nodes) @@ -259,27 +254,27 @@ class ZepEntityReader: if entity_type != "Entity": node["labels"] = [entity_type] + labels - # 获取所有边(用于后续关联查找) + # Read every edge so we can enrich entities later. all_edges = self.get_all_edges(graph_id) if enrich_with_edges else [] - # 构建节点UUID到节点数据的映射 + # uuid -> node-data map for fast lookup. node_map = {n["uuid"]: n for n in all_nodes} - # 筛选符合条件的实体 + # Filter to entities that match the criteria. filtered_entities = [] entity_types_found = set() for node in all_nodes: labels = node.get("labels", []) - # 筛选逻辑:Labels必须包含除"Entity"和"Node"之外的标签 + # Filtering rule: labels must contain something other than the defaults. custom_labels = [l for l in labels if l not in ["Entity", "Node"]] if not custom_labels: - # 只有默认标签,跳过 + # Only default labels — skip. continue - - # 如果指定了预定义类型,检查是否匹配 + + # When a predefined-type list is supplied, require a match against it. if defined_entity_types: matching_labels = [l for l in custom_labels if l in defined_entity_types] if not matching_labels: @@ -287,10 +282,9 @@ class ZepEntityReader: entity_type = matching_labels[0] else: entity_type = custom_labels[0] - + entity_types_found.add(entity_type) - - # 创建实体节点对象 + entity = EntityNode( uuid=node["uuid"], name=node["name"], @@ -298,12 +292,12 @@ class ZepEntityReader: summary=node["summary"], attributes=node["attributes"], ) - - # 获取相关边和节点 + + # Enrich with related edges and neighboring nodes. if enrich_with_edges: related_edges = [] related_node_uuids = set() - + for edge in all_edges: if edge["source_node_uuid"] == node["uuid"]: related_edges.append({ @@ -321,10 +315,10 @@ class ZepEntityReader: "source_node_uuid": edge["source_node_uuid"], }) related_node_uuids.add(edge["source_node_uuid"]) - + entity.related_edges = related_edges - - # 获取关联节点的基本信息 + + # Populate basic info for each neighboring node. related_nodes = [] for related_uuid in related_node_uuids: if related_uuid in node_map: @@ -335,56 +329,55 @@ class ZepEntityReader: "labels": related_node["labels"], "summary": related_node.get("summary", ""), }) - + entity.related_nodes = related_nodes - + filtered_entities.append(entity) - + logger.info(t("log.zep_entity_reader.m009", total_count=total_count, len=len(filtered_entities), entity_types_found=entity_types_found)) - + return FilteredEntities( entities=filtered_entities, entity_types=entity_types_found, total_count=total_count, filtered_count=len(filtered_entities), ) - + def get_entity_with_context( - self, - graph_id: str, + self, + graph_id: str, entity_uuid: str ) -> Optional[EntityNode]: - """ - 获取单个实体及其完整上下文(边和关联节点,带重试机制) - + """Fetch a single entity with its full context (edges + neighbors), with retry. + Args: - graph_id: 图谱ID - entity_uuid: 实体UUID - + graph_id: Graph identifier. + entity_uuid: Entity UUID. + Returns: - EntityNode或None + ``EntityNode`` or ``None`` if not found. """ try: - # 使用重试机制获取节点 + # Fetch the node with retry. node = self._call_with_retry( func=lambda: self.client.graph.node.get(uuid_=entity_uuid), operation_name=f"获取节点详情(uuid={entity_uuid[:8]}...)" ) - + if not node: return None - - # 获取节点的边 + + # Edges connected to this node. edges = self.get_node_edges(entity_uuid) - - # 获取所有节点用于关联查找 + + # All graph nodes, used for neighbor lookup. all_nodes = self.get_all_nodes(graph_id) node_map = {n["uuid"]: n for n in all_nodes} - - # 处理相关边和节点 + + # Collect related edges and neighboring uuids. related_edges = [] related_node_uuids = set() - + for edge in edges: if edge["source_node_uuid"] == entity_uuid: related_edges.append({ @@ -402,8 +395,8 @@ class ZepEntityReader: "source_node_uuid": edge["source_node_uuid"], }) related_node_uuids.add(edge["source_node_uuid"]) - - # 获取关联节点信息 + + # Populate basic info for each neighboring node. related_nodes = [] for related_uuid in related_node_uuids: if related_uuid in node_map: @@ -414,7 +407,7 @@ class ZepEntityReader: "labels": related_node["labels"], "summary": related_node.get("summary", ""), }) - + return EntityNode( uuid=getattr(node, 'uuid_', None) or getattr(node, 'uuid', ''), name=node.name or "", @@ -424,27 +417,26 @@ class ZepEntityReader: related_edges=related_edges, related_nodes=related_nodes, ) - + except Exception as e: logger.error(t("log.zep_entity_reader.m010", entity_uuid=entity_uuid, str=str(e))) return None - + def get_entities_by_type( - self, - graph_id: str, + self, + graph_id: str, entity_type: str, enrich_with_edges: bool = True ) -> List[EntityNode]: - """ - 获取指定类型的所有实体 - + """Return every entity matching the given type. + Args: - graph_id: 图谱ID - entity_type: 实体类型(如 "Student", "PublicFigure" 等) - enrich_with_edges: 是否获取相关边信息 - + graph_id: Graph identifier. + entity_type: Entity type label (e.g. ``Student``, ``PublicFigure``). + enrich_with_edges: When ``True``, populate related edges/nodes. + Returns: - 实体列表 + A list of matching ``EntityNode`` instances. """ result = self.filter_defined_entities( graph_id=graph_id, diff --git a/backend/run.py b/backend/run.py index 4e3b04fa..2d2e7cd4 100644 --- a/backend/run.py +++ b/backend/run.py @@ -1,21 +1,20 @@ -""" -MiroFish Backend 启动入口 -""" +"""MiroFish backend entry point.""" import os import sys -# 解决 Windows 控制台中文乱码问题:在所有导入之前设置 UTF-8 编码 +# Force UTF-8 on Windows console before importing anything that might write to +# stdout/stderr; otherwise non-ASCII characters render as mojibake. if sys.platform == 'win32': - # 设置环境变量确保 Python 使用 UTF-8 + # Make sure Python itself uses UTF-8. os.environ.setdefault('PYTHONIOENCODING', 'utf-8') - # 重新配置标准输出流为 UTF-8 + # Reconfigure the standard streams to UTF-8. if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='replace') if hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(encoding='utf-8', errors='replace') -# 添加项目根目录到路径 +# Add the project root to sys.path so the ``app`` package resolves. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app import create_app @@ -23,8 +22,7 @@ from app.config import Config def main(): - """主函数""" - # 验证配置 + """Validate configuration and start the Flask development server.""" errors = Config.validate() if errors: print("配置错误:") @@ -32,19 +30,16 @@ def main(): print(f" - {err}") print("\n请检查 .env 文件中的配置") sys.exit(1) - - # 创建应用 + app = create_app() - - # 获取运行配置 + + # Resolve runtime host/port from the environment. host = os.environ.get('FLASK_HOST', '0.0.0.0') port = int(os.environ.get('FLASK_PORT', 5001)) debug = Config.DEBUG - - # 启动服务 + app.run(host=host, port=port, debug=debug, threaded=True) if __name__ == '__main__': main() -