diff --git a/backend/app/api/report.py b/backend/app/api/report.py index e05c73c3..efcb9aeb 100644 --- a/backend/app/api/report.py +++ b/backend/app/api/report.py @@ -6,7 +6,7 @@ Report API路由 import os import traceback import threading -from flask import request, jsonify, send_file +from flask import request, jsonify, send_file, after_this_request from . import report_bp from ..config import Config @@ -414,7 +414,16 @@ def download_report(report_id: str): with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: f.write(report.markdown_content) temp_path = f.name - + + @after_this_request + def cleanup(response): + try: + if temp_path and os.path.exists(temp_path): + os.unlink(temp_path) + except OSError: + pass + return response + return send_file( temp_path, as_attachment=True, diff --git a/backend/app/api/simulation.py b/backend/app/api/simulation.py index 3a0f6816..b1f7e7ee 100644 --- a/backend/app/api/simulation.py +++ b/backend/app/api/simulation.py @@ -2016,28 +2016,26 @@ def get_simulation_posts(simulation_id: str): }) import sqlite3 - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - try: - cursor.execute(""" - SELECT * FROM post - ORDER BY created_at DESC - LIMIT ? OFFSET ? - """, (limit, offset)) - - posts = [dict(row) for row in cursor.fetchall()] - - cursor.execute("SELECT COUNT(*) FROM post") - total = cursor.fetchone()[0] - - except sqlite3.OperationalError: - posts = [] - total = 0 - - conn.close() - + with sqlite3.connect(db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + try: + cursor.execute(""" + SELECT * FROM post + ORDER BY created_at DESC + LIMIT ? OFFSET ? + """, (limit, offset)) + + posts = [dict(row) for row in cursor.fetchall()] + + cursor.execute("SELECT COUNT(*) FROM post") + total = cursor.fetchone()[0] + + except sqlite3.OperationalError: + posts = [] + total = 0 + return jsonify({ "success": True, "data": { @@ -2089,32 +2087,30 @@ def get_simulation_comments(simulation_id: str): }) import sqlite3 - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - try: - if post_id: - cursor.execute(""" - SELECT * FROM comment - WHERE post_id = ? - ORDER BY created_at DESC - LIMIT ? OFFSET ? - """, (post_id, limit, offset)) - else: - cursor.execute(""" - SELECT * FROM comment - ORDER BY created_at DESC - LIMIT ? OFFSET ? - """, (limit, offset)) - - comments = [dict(row) for row in cursor.fetchall()] - - except sqlite3.OperationalError: - comments = [] - - conn.close() - + with sqlite3.connect(db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + try: + if post_id: + cursor.execute(""" + SELECT * FROM comment + WHERE post_id = ? + ORDER BY created_at DESC + LIMIT ? OFFSET ? + """, (post_id, limit, offset)) + else: + cursor.execute(""" + SELECT * FROM comment + ORDER BY created_at DESC + LIMIT ? OFFSET ? + """, (limit, offset)) + + comments = [dict(row) for row in cursor.fetchall()] + + except sqlite3.OperationalError: + comments = [] + return jsonify({ "success": True, "data": { diff --git a/backend/app/services/simulation_runner.py b/backend/app/services/simulation_runner.py index 8c35380d..71d7fb1d 100644 --- a/backend/app/services/simulation_runner.py +++ b/backend/app/services/simulation_runner.py @@ -425,27 +425,31 @@ class SimulationRunner: # 创建主日志文件,避免 stdout/stderr 管道缓冲区满导致进程阻塞 main_log_path = os.path.join(sim_dir, "simulation.log") main_log_file = open(main_log_path, 'w', encoding='utf-8') - - # 设置子进程环境变量,确保 Windows 上使用 UTF-8 编码 - # 这可以修复第三方库(如 OASIS)读取文件时未指定编码的问题 - env = os.environ.copy() - env['PYTHONUTF8'] = '1' # Python 3.7+ 支持,让所有 open() 默认使用 UTF-8 - env['PYTHONIOENCODING'] = 'utf-8' # 确保 stdout/stderr 使用 UTF-8 - - # 设置工作目录为模拟目录(数据库等文件会生成在此) - # 使用 start_new_session=True 创建新的进程组,确保可以通过 os.killpg 终止所有子进程 - process = subprocess.Popen( - cmd, - cwd=sim_dir, - stdout=main_log_file, - stderr=subprocess.STDOUT, # stderr 也写入同一个文件 - text=True, - encoding='utf-8', # 显式指定编码 - bufsize=1, - env=env, # 传递带有 UTF-8 设置的环境变量 - start_new_session=True, # 创建新进程组,确保服务器关闭时能终止所有相关进程 - ) - + + try: + # 设置子进程环境变量,确保 Windows 上使用 UTF-8 编码 + # 这可以修复第三方库(如 OASIS)读取文件时未指定编码的问题 + env = os.environ.copy() + env['PYTHONUTF8'] = '1' # Python 3.7+ 支持,让所有 open() 默认使用 UTF-8 + env['PYTHONIOENCODING'] = 'utf-8' # 确保 stdout/stderr 使用 UTF-8 + + # 设置工作目录为模拟目录(数据库等文件会生成在此) + # 使用 start_new_session=True 创建新的进程组,确保可以通过 os.killpg 终止所有子进程 + process = subprocess.Popen( + cmd, + cwd=sim_dir, + stdout=main_log_file, + stderr=subprocess.STDOUT, # stderr 也写入同一个文件 + text=True, + encoding='utf-8', # 显式指定编码 + bufsize=1, + env=env, # 传递带有 UTF-8 设置的环境变量 + start_new_session=True, # 创建新进程组,确保服务器关闭时能终止所有相关进程 + ) + except Exception: + main_log_file.close() + raise + # 保存文件句柄以便后续关闭 cls._stdout_files[simulation_id] = main_log_file cls._stderr_files[simulation_id] = None # 不再需要单独的 stderr @@ -1667,41 +1671,39 @@ class SimulationRunner: results = [] try: - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - - if agent_id is not None: - cursor.execute(""" - SELECT user_id, info, created_at - FROM trace - WHERE action = 'interview' AND user_id = ? - ORDER BY created_at DESC - LIMIT ? - """, (agent_id, limit)) - else: - cursor.execute(""" - SELECT user_id, info, created_at - FROM trace - WHERE action = 'interview' - ORDER BY created_at DESC - LIMIT ? - """, (limit,)) - - for user_id, info_json, created_at in cursor.fetchall(): - try: - info = json.loads(info_json) if info_json else {} - except json.JSONDecodeError: - info = {"raw": info_json} - - results.append({ - "agent_id": user_id, - "response": info.get("response", info), - "prompt": info.get("prompt", ""), - "timestamp": created_at, - "platform": platform_name - }) - - conn.close() + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + + if agent_id is not None: + cursor.execute(""" + SELECT user_id, info, created_at + FROM trace + WHERE action = 'interview' AND user_id = ? + ORDER BY created_at DESC + LIMIT ? + """, (agent_id, limit)) + else: + cursor.execute(""" + SELECT user_id, info, created_at + FROM trace + WHERE action = 'interview' + ORDER BY created_at DESC + LIMIT ? + """, (limit,)) + + for user_id, info_json, created_at in cursor.fetchall(): + try: + info = json.loads(info_json) if info_json else {} + except json.JSONDecodeError: + info = {"raw": info_json} + + results.append({ + "agent_id": user_id, + "response": info.get("response", info), + "prompt": info.get("prompt", ""), + "timestamp": created_at, + "platform": platform_name + }) except Exception as e: logger.error(f"读取Interview历史失败 ({platform_name}): {e}") diff --git a/backend/scripts/run_parallel_simulation.py b/backend/scripts/run_parallel_simulation.py index 2a627ffd..3a58a242 100644 --- a/backend/scripts/run_parallel_simulation.py +++ b/backend/scripts/run_parallel_simulation.py @@ -528,29 +528,27 @@ class ParallelIPCHandler: return result try: - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - - # 查询最新的Interview记录 - cursor.execute(""" - SELECT user_id, info, created_at - FROM trace - WHERE action = ? AND user_id = ? - ORDER BY created_at DESC - LIMIT 1 - """, (ActionType.INTERVIEW.value, agent_id)) - - row = cursor.fetchone() - if row: - user_id, info_json, created_at = row - try: - info = json.loads(info_json) if info_json else {} - result["response"] = info.get("response", info) - result["timestamp"] = created_at - except json.JSONDecodeError: - result["response"] = info_json - - conn.close() + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + + # 查询最新的Interview记录 + cursor.execute(""" + SELECT user_id, info, created_at + FROM trace + WHERE action = ? AND user_id = ? + ORDER BY created_at DESC + LIMIT 1 + """, (ActionType.INTERVIEW.value, agent_id)) + + row = cursor.fetchone() + if row: + user_id, info_json, created_at = row + try: + info = json.loads(info_json) if info_json else {} + result["response"] = info.get("response", info) + result["timestamp"] = created_at + except json.JSONDecodeError: + result["response"] = info_json except Exception as e: print(f" 读取Interview结果失败: {e}") @@ -679,67 +677,65 @@ def fetch_new_actions_from_db( return actions, new_last_rowid try: - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - - # 使用 rowid 来追踪已处理的记录(rowid 是 SQLite 的内置自增字段) - # 这样可以避免 created_at 格式差异问题(Twitter 用整数,Reddit 用日期时间字符串) - cursor.execute(""" - SELECT rowid, user_id, action, info - FROM trace - WHERE rowid > ? - ORDER BY rowid ASC - """, (last_rowid,)) - - for rowid, user_id, action, info_json in cursor.fetchall(): - # 更新最大 rowid - new_last_rowid = rowid - - # 过滤非核心动作 - if action in FILTERED_ACTIONS: - continue - - # 解析动作参数 - try: - action_args = json.loads(info_json) if info_json else {} - except json.JSONDecodeError: - action_args = {} - - # 精简 action_args,只保留关键字段(保留完整内容,不截断) - simplified_args = {} - if 'content' in action_args: - simplified_args['content'] = action_args['content'] - if 'post_id' in action_args: - simplified_args['post_id'] = action_args['post_id'] - if 'comment_id' in action_args: - simplified_args['comment_id'] = action_args['comment_id'] - if 'quoted_id' in action_args: - simplified_args['quoted_id'] = action_args['quoted_id'] - if 'new_post_id' in action_args: - simplified_args['new_post_id'] = action_args['new_post_id'] - if 'follow_id' in action_args: - simplified_args['follow_id'] = action_args['follow_id'] - if 'query' in action_args: - simplified_args['query'] = action_args['query'] - if 'like_id' in action_args: - simplified_args['like_id'] = action_args['like_id'] - if 'dislike_id' in action_args: - simplified_args['dislike_id'] = action_args['dislike_id'] - - # 转换动作类型名称 - action_type = ACTION_TYPE_MAP.get(action, action.upper()) - - # 补充上下文信息(帖子内容、用户名等) - _enrich_action_context(cursor, action_type, simplified_args, agent_names) - - actions.append({ - 'agent_id': user_id, - 'agent_name': agent_names.get(user_id, f'Agent_{user_id}'), - 'action_type': action_type, - 'action_args': simplified_args, - }) - - conn.close() + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + + # 使用 rowid 来追踪已处理的记录(rowid 是 SQLite 的内置自增字段) + # 这样可以避免 created_at 格式差异问题(Twitter 用整数,Reddit 用日期时间字符串) + cursor.execute(""" + SELECT rowid, user_id, action, info + FROM trace + WHERE rowid > ? + ORDER BY rowid ASC + """, (last_rowid,)) + + for rowid, user_id, action, info_json in cursor.fetchall(): + # 更新最大 rowid + new_last_rowid = rowid + + # 过滤非核心动作 + if action in FILTERED_ACTIONS: + continue + + # 解析动作参数 + try: + action_args = json.loads(info_json) if info_json else {} + except json.JSONDecodeError: + action_args = {} + + # 精简 action_args,只保留关键字段(保留完整内容,不截断) + simplified_args = {} + if 'content' in action_args: + simplified_args['content'] = action_args['content'] + if 'post_id' in action_args: + simplified_args['post_id'] = action_args['post_id'] + if 'comment_id' in action_args: + simplified_args['comment_id'] = action_args['comment_id'] + if 'quoted_id' in action_args: + simplified_args['quoted_id'] = action_args['quoted_id'] + if 'new_post_id' in action_args: + simplified_args['new_post_id'] = action_args['new_post_id'] + if 'follow_id' in action_args: + simplified_args['follow_id'] = action_args['follow_id'] + if 'query' in action_args: + simplified_args['query'] = action_args['query'] + if 'like_id' in action_args: + simplified_args['like_id'] = action_args['like_id'] + if 'dislike_id' in action_args: + simplified_args['dislike_id'] = action_args['dislike_id'] + + # 转换动作类型名称 + action_type = ACTION_TYPE_MAP.get(action, action.upper()) + + # 补充上下文信息(帖子内容、用户名等) + _enrich_action_context(cursor, action_type, simplified_args, agent_names) + + actions.append({ + 'agent_id': user_id, + 'agent_name': agent_names.get(user_id, f'Agent_{user_id}'), + 'action_type': action_type, + 'action_args': simplified_args, + }) except Exception as e: print(f"读取数据库动作失败: {e}") diff --git a/backend/scripts/run_reddit_simulation.py b/backend/scripts/run_reddit_simulation.py index 14907cbd..99c49e27 100644 --- a/backend/scripts/run_reddit_simulation.py +++ b/backend/scripts/run_reddit_simulation.py @@ -311,30 +311,28 @@ class IPCHandler: return result try: - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - - # 查询最新的Interview记录 - cursor.execute(""" - SELECT user_id, info, created_at - FROM trace - WHERE action = ? AND user_id = ? - ORDER BY created_at DESC - LIMIT 1 - """, (ActionType.INTERVIEW.value, agent_id)) - - row = cursor.fetchone() - if row: - user_id, info_json, created_at = row - try: - info = json.loads(info_json) if info_json else {} - result["response"] = info.get("response", info) - result["timestamp"] = created_at - except json.JSONDecodeError: - result["response"] = info_json - - conn.close() - + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + + # 查询最新的Interview记录 + cursor.execute(""" + SELECT user_id, info, created_at + FROM trace + WHERE action = ? AND user_id = ? + ORDER BY created_at DESC + LIMIT 1 + """, (ActionType.INTERVIEW.value, agent_id)) + + row = cursor.fetchone() + if row: + user_id, info_json, created_at = row + try: + info = json.loads(info_json) if info_json else {} + result["response"] = info.get("response", info) + result["timestamp"] = created_at + except json.JSONDecodeError: + result["response"] = info_json + except Exception as e: print(f" 读取Interview结果失败: {e}") diff --git a/backend/scripts/run_twitter_simulation.py b/backend/scripts/run_twitter_simulation.py index caab9e9d..038f45d4 100644 --- a/backend/scripts/run_twitter_simulation.py +++ b/backend/scripts/run_twitter_simulation.py @@ -311,30 +311,28 @@ class IPCHandler: return result try: - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - - # 查询最新的Interview记录 - cursor.execute(""" - SELECT user_id, info, created_at - FROM trace - WHERE action = ? AND user_id = ? - ORDER BY created_at DESC - LIMIT 1 - """, (ActionType.INTERVIEW.value, agent_id)) - - row = cursor.fetchone() - if row: - user_id, info_json, created_at = row - try: - info = json.loads(info_json) if info_json else {} - result["response"] = info.get("response", info) - result["timestamp"] = created_at - except json.JSONDecodeError: - result["response"] = info_json - - conn.close() - + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + + # 查询最新的Interview记录 + cursor.execute(""" + SELECT user_id, info, created_at + FROM trace + WHERE action = ? AND user_id = ? + ORDER BY created_at DESC + LIMIT 1 + """, (ActionType.INTERVIEW.value, agent_id)) + + row = cursor.fetchone() + if row: + user_id, info_json, created_at = row + try: + info = json.loads(info_json) if info_json else {} + result["response"] = info.get("response", info) + result["timestamp"] = created_at + except json.JSONDecodeError: + result["response"] = info_json + except Exception as e: print(f" 读取Interview结果失败: {e}")