fix: resource leaks and connection safety

- Use context managers for all SQLite connections
- Add temp file cleanup in report download endpoint
- Add finally blocks for file handle cleanup in simulation runner
This commit is contained in:
Nyk 2026-03-17 21:19:27 +07:00
parent 985f89f49a
commit 7d76f663dc
6 changed files with 237 additions and 238 deletions

View File

@ -6,7 +6,7 @@ Report API路由
import os import os
import traceback import traceback
import threading import threading
from flask import request, jsonify, send_file from flask import request, jsonify, send_file, after_this_request
from . import report_bp from . import report_bp
from ..config import Config from ..config import Config
@ -414,7 +414,16 @@ def download_report(report_id: str):
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(report.markdown_content) f.write(report.markdown_content)
temp_path = f.name temp_path = f.name
@after_this_request
def cleanup(response):
try:
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)
except OSError:
pass
return response
return send_file( return send_file(
temp_path, temp_path,
as_attachment=True, as_attachment=True,

View File

@ -2016,28 +2016,26 @@ def get_simulation_posts(simulation_id: str):
}) })
import sqlite3 import sqlite3
conn = sqlite3.connect(db_path) with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
cursor = conn.cursor() cursor = conn.cursor()
try: try:
cursor.execute(""" cursor.execute("""
SELECT * FROM post SELECT * FROM post
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT ? OFFSET ? LIMIT ? OFFSET ?
""", (limit, offset)) """, (limit, offset))
posts = [dict(row) for row in cursor.fetchall()] posts = [dict(row) for row in cursor.fetchall()]
cursor.execute("SELECT COUNT(*) FROM post") cursor.execute("SELECT COUNT(*) FROM post")
total = cursor.fetchone()[0] total = cursor.fetchone()[0]
except sqlite3.OperationalError: except sqlite3.OperationalError:
posts = [] posts = []
total = 0 total = 0
conn.close()
return jsonify({ return jsonify({
"success": True, "success": True,
"data": { "data": {
@ -2089,32 +2087,30 @@ def get_simulation_comments(simulation_id: str):
}) })
import sqlite3 import sqlite3
conn = sqlite3.connect(db_path) with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
cursor = conn.cursor() cursor = conn.cursor()
try: try:
if post_id: if post_id:
cursor.execute(""" cursor.execute("""
SELECT * FROM comment SELECT * FROM comment
WHERE post_id = ? WHERE post_id = ?
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT ? OFFSET ? LIMIT ? OFFSET ?
""", (post_id, limit, offset)) """, (post_id, limit, offset))
else: else:
cursor.execute(""" cursor.execute("""
SELECT * FROM comment SELECT * FROM comment
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT ? OFFSET ? LIMIT ? OFFSET ?
""", (limit, offset)) """, (limit, offset))
comments = [dict(row) for row in cursor.fetchall()] comments = [dict(row) for row in cursor.fetchall()]
except sqlite3.OperationalError: except sqlite3.OperationalError:
comments = [] comments = []
conn.close()
return jsonify({ return jsonify({
"success": True, "success": True,
"data": { "data": {

View File

@ -425,27 +425,31 @@ class SimulationRunner:
# 创建主日志文件,避免 stdout/stderr 管道缓冲区满导致进程阻塞 # 创建主日志文件,避免 stdout/stderr 管道缓冲区满导致进程阻塞
main_log_path = os.path.join(sim_dir, "simulation.log") main_log_path = os.path.join(sim_dir, "simulation.log")
main_log_file = open(main_log_path, 'w', encoding='utf-8') main_log_file = open(main_log_path, 'w', encoding='utf-8')
# 设置子进程环境变量,确保 Windows 上使用 UTF-8 编码 try:
# 这可以修复第三方库(如 OASIS读取文件时未指定编码的问题 # 设置子进程环境变量,确保 Windows 上使用 UTF-8 编码
env = os.environ.copy() # 这可以修复第三方库(如 OASIS读取文件时未指定编码的问题
env['PYTHONUTF8'] = '1' # Python 3.7+ 支持,让所有 open() 默认使用 UTF-8 env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8' # 确保 stdout/stderr 使用 UTF-8 env['PYTHONUTF8'] = '1' # Python 3.7+ 支持,让所有 open() 默认使用 UTF-8
env['PYTHONIOENCODING'] = 'utf-8' # 确保 stdout/stderr 使用 UTF-8
# 设置工作目录为模拟目录(数据库等文件会生成在此)
# 使用 start_new_session=True 创建新的进程组,确保可以通过 os.killpg 终止所有子进程 # 设置工作目录为模拟目录(数据库等文件会生成在此)
process = subprocess.Popen( # 使用 start_new_session=True 创建新的进程组,确保可以通过 os.killpg 终止所有子进程
cmd, process = subprocess.Popen(
cwd=sim_dir, cmd,
stdout=main_log_file, cwd=sim_dir,
stderr=subprocess.STDOUT, # stderr 也写入同一个文件 stdout=main_log_file,
text=True, stderr=subprocess.STDOUT, # stderr 也写入同一个文件
encoding='utf-8', # 显式指定编码 text=True,
bufsize=1, encoding='utf-8', # 显式指定编码
env=env, # 传递带有 UTF-8 设置的环境变量 bufsize=1,
start_new_session=True, # 创建新进程组,确保服务器关闭时能终止所有相关进程 env=env, # 传递带有 UTF-8 设置的环境变量
) start_new_session=True, # 创建新进程组,确保服务器关闭时能终止所有相关进程
)
except Exception:
main_log_file.close()
raise
# 保存文件句柄以便后续关闭 # 保存文件句柄以便后续关闭
cls._stdout_files[simulation_id] = main_log_file cls._stdout_files[simulation_id] = main_log_file
cls._stderr_files[simulation_id] = None # 不再需要单独的 stderr cls._stderr_files[simulation_id] = None # 不再需要单独的 stderr
@ -1667,41 +1671,39 @@ class SimulationRunner:
results = [] results = []
try: try:
conn = sqlite3.connect(db_path) with sqlite3.connect(db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
if agent_id is not None: if agent_id is not None:
cursor.execute(""" cursor.execute("""
SELECT user_id, info, created_at SELECT user_id, info, created_at
FROM trace FROM trace
WHERE action = 'interview' AND user_id = ? WHERE action = 'interview' AND user_id = ?
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT ? LIMIT ?
""", (agent_id, limit)) """, (agent_id, limit))
else: else:
cursor.execute(""" cursor.execute("""
SELECT user_id, info, created_at SELECT user_id, info, created_at
FROM trace FROM trace
WHERE action = 'interview' WHERE action = 'interview'
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT ? LIMIT ?
""", (limit,)) """, (limit,))
for user_id, info_json, created_at in cursor.fetchall(): for user_id, info_json, created_at in cursor.fetchall():
try: try:
info = json.loads(info_json) if info_json else {} info = json.loads(info_json) if info_json else {}
except json.JSONDecodeError: except json.JSONDecodeError:
info = {"raw": info_json} info = {"raw": info_json}
results.append({ results.append({
"agent_id": user_id, "agent_id": user_id,
"response": info.get("response", info), "response": info.get("response", info),
"prompt": info.get("prompt", ""), "prompt": info.get("prompt", ""),
"timestamp": created_at, "timestamp": created_at,
"platform": platform_name "platform": platform_name
}) })
conn.close()
except Exception as e: except Exception as e:
logger.error(f"读取Interview历史失败 ({platform_name}): {e}") logger.error(f"读取Interview历史失败 ({platform_name}): {e}")

View File

@ -528,29 +528,27 @@ class ParallelIPCHandler:
return result return result
try: try:
conn = sqlite3.connect(db_path) with sqlite3.connect(db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
# 查询最新的Interview记录 # 查询最新的Interview记录
cursor.execute(""" cursor.execute("""
SELECT user_id, info, created_at SELECT user_id, info, created_at
FROM trace FROM trace
WHERE action = ? AND user_id = ? WHERE action = ? AND user_id = ?
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT 1 LIMIT 1
""", (ActionType.INTERVIEW.value, agent_id)) """, (ActionType.INTERVIEW.value, agent_id))
row = cursor.fetchone() row = cursor.fetchone()
if row: if row:
user_id, info_json, created_at = row user_id, info_json, created_at = row
try: try:
info = json.loads(info_json) if info_json else {} info = json.loads(info_json) if info_json else {}
result["response"] = info.get("response", info) result["response"] = info.get("response", info)
result["timestamp"] = created_at result["timestamp"] = created_at
except json.JSONDecodeError: except json.JSONDecodeError:
result["response"] = info_json result["response"] = info_json
conn.close()
except Exception as e: except Exception as e:
print(f" 读取Interview结果失败: {e}") print(f" 读取Interview结果失败: {e}")
@ -679,67 +677,65 @@ def fetch_new_actions_from_db(
return actions, new_last_rowid return actions, new_last_rowid
try: try:
conn = sqlite3.connect(db_path) with sqlite3.connect(db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
# 使用 rowid 来追踪已处理的记录rowid 是 SQLite 的内置自增字段) # 使用 rowid 来追踪已处理的记录rowid 是 SQLite 的内置自增字段)
# 这样可以避免 created_at 格式差异问题Twitter 用整数Reddit 用日期时间字符串) # 这样可以避免 created_at 格式差异问题Twitter 用整数Reddit 用日期时间字符串)
cursor.execute(""" cursor.execute("""
SELECT rowid, user_id, action, info SELECT rowid, user_id, action, info
FROM trace FROM trace
WHERE rowid > ? WHERE rowid > ?
ORDER BY rowid ASC ORDER BY rowid ASC
""", (last_rowid,)) """, (last_rowid,))
for rowid, user_id, action, info_json in cursor.fetchall(): for rowid, user_id, action, info_json in cursor.fetchall():
# 更新最大 rowid # 更新最大 rowid
new_last_rowid = rowid new_last_rowid = rowid
# 过滤非核心动作 # 过滤非核心动作
if action in FILTERED_ACTIONS: if action in FILTERED_ACTIONS:
continue continue
# 解析动作参数 # 解析动作参数
try: try:
action_args = json.loads(info_json) if info_json else {} action_args = json.loads(info_json) if info_json else {}
except json.JSONDecodeError: except json.JSONDecodeError:
action_args = {} action_args = {}
# 精简 action_args只保留关键字段保留完整内容不截断 # 精简 action_args只保留关键字段保留完整内容不截断
simplified_args = {} simplified_args = {}
if 'content' in action_args: if 'content' in action_args:
simplified_args['content'] = action_args['content'] simplified_args['content'] = action_args['content']
if 'post_id' in action_args: if 'post_id' in action_args:
simplified_args['post_id'] = action_args['post_id'] simplified_args['post_id'] = action_args['post_id']
if 'comment_id' in action_args: if 'comment_id' in action_args:
simplified_args['comment_id'] = action_args['comment_id'] simplified_args['comment_id'] = action_args['comment_id']
if 'quoted_id' in action_args: if 'quoted_id' in action_args:
simplified_args['quoted_id'] = action_args['quoted_id'] simplified_args['quoted_id'] = action_args['quoted_id']
if 'new_post_id' in action_args: if 'new_post_id' in action_args:
simplified_args['new_post_id'] = action_args['new_post_id'] simplified_args['new_post_id'] = action_args['new_post_id']
if 'follow_id' in action_args: if 'follow_id' in action_args:
simplified_args['follow_id'] = action_args['follow_id'] simplified_args['follow_id'] = action_args['follow_id']
if 'query' in action_args: if 'query' in action_args:
simplified_args['query'] = action_args['query'] simplified_args['query'] = action_args['query']
if 'like_id' in action_args: if 'like_id' in action_args:
simplified_args['like_id'] = action_args['like_id'] simplified_args['like_id'] = action_args['like_id']
if 'dislike_id' in action_args: if 'dislike_id' in action_args:
simplified_args['dislike_id'] = action_args['dislike_id'] simplified_args['dislike_id'] = action_args['dislike_id']
# 转换动作类型名称 # 转换动作类型名称
action_type = ACTION_TYPE_MAP.get(action, action.upper()) action_type = ACTION_TYPE_MAP.get(action, action.upper())
# 补充上下文信息(帖子内容、用户名等) # 补充上下文信息(帖子内容、用户名等)
_enrich_action_context(cursor, action_type, simplified_args, agent_names) _enrich_action_context(cursor, action_type, simplified_args, agent_names)
actions.append({ actions.append({
'agent_id': user_id, 'agent_id': user_id,
'agent_name': agent_names.get(user_id, f'Agent_{user_id}'), 'agent_name': agent_names.get(user_id, f'Agent_{user_id}'),
'action_type': action_type, 'action_type': action_type,
'action_args': simplified_args, 'action_args': simplified_args,
}) })
conn.close()
except Exception as e: except Exception as e:
print(f"读取数据库动作失败: {e}") print(f"读取数据库动作失败: {e}")

View File

@ -311,30 +311,28 @@ class IPCHandler:
return result return result
try: try:
conn = sqlite3.connect(db_path) with sqlite3.connect(db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
# 查询最新的Interview记录 # 查询最新的Interview记录
cursor.execute(""" cursor.execute("""
SELECT user_id, info, created_at SELECT user_id, info, created_at
FROM trace FROM trace
WHERE action = ? AND user_id = ? WHERE action = ? AND user_id = ?
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT 1 LIMIT 1
""", (ActionType.INTERVIEW.value, agent_id)) """, (ActionType.INTERVIEW.value, agent_id))
row = cursor.fetchone() row = cursor.fetchone()
if row: if row:
user_id, info_json, created_at = row user_id, info_json, created_at = row
try: try:
info = json.loads(info_json) if info_json else {} info = json.loads(info_json) if info_json else {}
result["response"] = info.get("response", info) result["response"] = info.get("response", info)
result["timestamp"] = created_at result["timestamp"] = created_at
except json.JSONDecodeError: except json.JSONDecodeError:
result["response"] = info_json result["response"] = info_json
conn.close()
except Exception as e: except Exception as e:
print(f" 读取Interview结果失败: {e}") print(f" 读取Interview结果失败: {e}")

View File

@ -311,30 +311,28 @@ class IPCHandler:
return result return result
try: try:
conn = sqlite3.connect(db_path) with sqlite3.connect(db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
# 查询最新的Interview记录 # 查询最新的Interview记录
cursor.execute(""" cursor.execute("""
SELECT user_id, info, created_at SELECT user_id, info, created_at
FROM trace FROM trace
WHERE action = ? AND user_id = ? WHERE action = ? AND user_id = ?
ORDER BY created_at DESC ORDER BY created_at DESC
LIMIT 1 LIMIT 1
""", (ActionType.INTERVIEW.value, agent_id)) """, (ActionType.INTERVIEW.value, agent_id))
row = cursor.fetchone() row = cursor.fetchone()
if row: if row:
user_id, info_json, created_at = row user_id, info_json, created_at = row
try: try:
info = json.loads(info_json) if info_json else {} info = json.loads(info_json) if info_json else {}
result["response"] = info.get("response", info) result["response"] = info.get("response", info)
result["timestamp"] = created_at result["timestamp"] = created_at
except json.JSONDecodeError: except json.JSONDecodeError:
result["response"] = info_json result["response"] = info_json
conn.close()
except Exception as e: except Exception as e:
print(f" 读取Interview结果失败: {e}") print(f" 读取Interview结果失败: {e}")