From 52bae0a3daee13a85d435c4b6abb2a0f36335a5a Mon Sep 17 00:00:00 2001 From: Christian Moellmann Date: Sat, 23 May 2026 12:37:04 +0200 Subject: [PATCH] feat(interviews): Flask blueprint /api/interview with task-based async + CSV export Add /api/interview blueprint with POST pre/post/rerun, GET status/results/synthesis/export.csv endpoints. Background tasks tracked by UUID in module-level dict. Add register_blueprints() helper to api/__init__.py and wire app factory through it. Add UPLOADS_DIR to Config with env-override default. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/app/__init__.py | 6 +- backend/app/api/__init__.py | 11 +- backend/app/api/interview.py | 161 ++++++++++++++++++ backend/app/config.py | 2 + .../tests/interviews/test_api_interview.py | 42 +++++ 5 files changed, 217 insertions(+), 5 deletions(-) create mode 100644 backend/app/api/interview.py create mode 100644 backend/tests/interviews/test_api_interview.py diff --git a/backend/app/__init__.py b/backend/app/__init__.py index aba624bb..c2a36fd2 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -63,10 +63,8 @@ def create_app(config_class=Config): return response # 注册蓝图 - from .api import graph_bp, simulation_bp, report_bp - app.register_blueprint(graph_bp, url_prefix='/api/graph') - app.register_blueprint(simulation_bp, url_prefix='/api/simulation') - app.register_blueprint(report_bp, url_prefix='/api/report') + from .api import register_blueprints + register_blueprints(app) # 健康检查 @app.route('/health') diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py index ffda743a..396750f2 100644 --- a/backend/app/api/__init__.py +++ b/backend/app/api/__init__.py @@ -2,13 +2,22 @@ API路由模块 """ -from flask import Blueprint +from flask import Blueprint, Flask graph_bp = Blueprint('graph', __name__) simulation_bp = Blueprint('simulation', __name__) report_bp = Blueprint('report', __name__) +interview_bp = Blueprint('interview', __name__) from . import graph # noqa: E402, F401 from . import simulation # noqa: E402, F401 from . import report # noqa: E402, F401 +from . import interview # noqa: E402, F401 + +def register_blueprints(app: Flask) -> None: + """Register all API blueprints on *app* with their canonical URL prefixes.""" + app.register_blueprint(graph_bp, url_prefix='/api/graph') + app.register_blueprint(simulation_bp, url_prefix='/api/simulation') + app.register_blueprint(report_bp, url_prefix='/api/report') + app.register_blueprint(interview_bp, url_prefix='/api/interview') diff --git a/backend/app/api/interview.py b/backend/app/api/interview.py new file mode 100644 index 00000000..993fda17 --- /dev/null +++ b/backend/app/api/interview.py @@ -0,0 +1,161 @@ +from __future__ import annotations +import threading +import traceback +import uuid +from pathlib import Path +from flask import Blueprint, jsonify, request, send_file +from app.config import Config +from app.models.interview import SubagentKind, InterviewPhase +from app.services.interviews.adapters import FileSystemPersonaProvider, ZepMemoryProvider +from app.services.interviews.zep_writer import InterviewZepWriter +from app.services.interview_orchestrator import InterviewOrchestrator +from app.services.interview_synthesizer import InterviewSynthesizer +from app.services.interviews.storage import InterviewStore +from app.utils.llm_client import LLMClient + +from . import interview_bp + +_TASKS: dict[str, dict] = {} +_LOCK = threading.Lock() + +INSTRUMENT_DIR = Path(__file__).resolve().parents[2] / "scripts" / "instruments" + + +def _uploads_root() -> Path: + return Path(getattr(Config, "UPLOADS_DIR", "uploads")) + + +def _build_orchestrator(sim_id: str) -> InterviewOrchestrator: + sim_dir = _uploads_root() / "simulations" / sim_id + reddit = sim_dir / "reddit_profiles.json" + twitter = sim_dir / "twitter_profiles.csv" + personas = FileSystemPersonaProvider(reddit_path=reddit if reddit.exists() else None, + twitter_path=twitter if twitter.exists() else None) + # Zep memory + writer: best-effort; in stub/test mode the writer no-ops on exceptions + class _NullUpdater: + def add_text_episode(self, *a, **kw): return None + try: + from app.services.zep_entity_reader import ZepEntityReader + from app.services.zep_graph_memory_updater import ZepGraphMemoryUpdater + graph_id = (sim_dir / "graph_id.txt").read_text().strip() if (sim_dir / "graph_id.txt").exists() else "" + reader = ZepEntityReader() + updater = ZepGraphMemoryUpdater() + memory = ZepMemoryProvider(reader, graph_id=graph_id) + zep_writer = InterviewZepWriter(memory_updater=updater, graph_id=graph_id) + except Exception: + class _Mem: + def get_digest(self, agent_id, max_chars=2000): + from app.services.interviews.base import MemoryDigest + return MemoryDigest(text="[memory unavailable]", available=False) + memory = _Mem() + zep_writer = InterviewZepWriter(memory_updater=_NullUpdater(), graph_id="") + llm = LLMClient(api_key=Config.LLM_API_KEY, base_url=Config.LLM_BASE_URL, + model=Config.LLM_MODEL_NAME) + return InterviewOrchestrator( + llm=llm, memory=memory, personas=personas, + instrument_dir=INSTRUMENT_DIR, store_root=_uploads_root(), sim_id=sim_id, + zep_writer=zep_writer, max_workers=Config.INTERVIEW_MAX_WORKERS, + language=Config.INTERVIEW_DEFAULT_LANGUAGE, + ) + + +def _run_task(task_id: str, fn) -> None: + with _LOCK: + _TASKS[task_id] = {"status": "running", "progress": {}, "result": None, "error": None} + try: + result = fn(task_id) + with _LOCK: + _TASKS[task_id]["status"] = "completed"; _TASKS[task_id]["result"] = result + except Exception as e: + with _LOCK: + _TASKS[task_id]["status"] = "failed" + _TASKS[task_id]["error"] = repr(e) + _TASKS[task_id]["traceback"] = traceback.format_exc() + + +def _start_task(fn) -> str: + task_id = uuid.uuid4().hex[:12] + with _LOCK: + _TASKS[task_id] = {"status": "queued", "progress": {}, "result": None, "error": None} + threading.Thread(target=_run_task, args=(task_id, fn), daemon=True).start() + return task_id + + +def _envelope(data=None, error=None, status: int = 200): + body = {"success": error is None, "data": data or {}, "error": error} + return jsonify(body), status + + +@interview_bp.route("//pre", methods=["POST"]) +def post_pre(sim_id: str): + orch = _build_orchestrator(sim_id) + task_id = _start_task(lambda tid: orch.run_pre()) + return _envelope({"task_id": task_id}) + + +@interview_bp.route("//post", methods=["POST"]) +def post_post(sim_id: str): + orch = _build_orchestrator(sim_id) + def run(tid): + out = orch.run_post() + synth = InterviewSynthesizer(store=orch.store) + out["synthesis"] = synth.run()[:1000] # short preview + return out + task_id = _start_task(run) + return _envelope({"task_id": task_id}) + + +@interview_bp.route("//rerun", methods=["POST"]) +def post_rerun(sim_id: str): + body = request.get_json(silent=True) or {} + sub = body.get("subagent") + try: subagent = SubagentKind(sub) + except ValueError: return _envelope(error=f"unknown subagent {sub!r}", status=400) + orch = _build_orchestrator(sim_id) + task_id = _start_task(lambda tid: orch.rerun(subagent)) + return _envelope({"task_id": task_id}) + + +@interview_bp.route("//status", methods=["GET"]) +def get_status(sim_id: str): + task_id = request.args.get("task_id") + with _LOCK: + task = _TASKS.get(task_id) + if task is None: return _envelope(error="unknown task_id", status=404) + return _envelope({"status": task["status"], "progress": task.get("progress", {}), + "result": task.get("result"), "error": task.get("error")}) + + +@interview_bp.route("//results/", methods=["GET"]) +def get_results(sim_id: str, subagent: str): + try: sub = SubagentKind(subagent) + except ValueError: return _envelope(error=f"unknown subagent {subagent!r}", status=400) + store = InterviewStore(root=_uploads_root(), sim_id=sim_id) + phase = InterviewPhase.T1 if sub != SubagentKind.LONGITUDINAL else InterviewPhase.T1 + run = store.latest_run(phase, sub) + if run is None: return _envelope(error="no results yet", status=404) + agg = (run / "aggregate.json") + if not agg.exists(): return _envelope(error="aggregate missing", status=404) + import json as _j + return _envelope({"aggregate": _j.loads(agg.read_text(encoding="utf-8")), + "run_dir": str(run)}) + + +@interview_bp.route("//results/synthesis", methods=["GET"]) +def get_synthesis(sim_id: str): + store = InterviewStore(root=_uploads_root(), sim_id=sim_id) + report = store.base / "synthesis" / "report.md" + if not report.exists(): + synth = InterviewSynthesizer(store=store) + synth.run() + return _envelope({"report_markdown": report.read_text(encoding="utf-8")}) + + +@interview_bp.route("//export.csv", methods=["GET"]) +def get_export_csv(sim_id: str): + store = InterviewStore(root=_uploads_root(), sim_id=sim_id) + csv_path = store.base / "synthesis" / "exports" / "all_responses.csv" + if not csv_path.exists(): + InterviewSynthesizer(store=store).run() + return send_file(csv_path, mimetype="text/csv", as_attachment=True, + download_name=f"{sim_id}_interviews.csv") diff --git a/backend/app/config.py b/backend/app/config.py index da7df8c1..11cf568a 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -39,6 +39,8 @@ class Config: MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50MB UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), '../uploads') ALLOWED_EXTENSIONS = {'pdf', 'md', 'txt', 'markdown'} + # Root directory for simulation uploads (used by the interview subsystem) + UPLOADS_DIR = os.environ.get("UPLOADS_DIR", os.path.join(os.path.dirname(__file__), '../uploads')) # 文本处理配置 DEFAULT_CHUNK_SIZE = 500 # 默认切块大小 diff --git a/backend/tests/interviews/test_api_interview.py b/backend/tests/interviews/test_api_interview.py new file mode 100644 index 00000000..baad634b --- /dev/null +++ b/backend/tests/interviews/test_api_interview.py @@ -0,0 +1,42 @@ +import json +import os +from pathlib import Path +import pytest + +@pytest.fixture +def client(tmp_path, monkeypatch): + monkeypatch.setenv("LLM_STUB_MODE", "true") + monkeypatch.setenv("UPLOADS_DIR", str(tmp_path)) + from app.config import Config + Config.LLM_STUB_MODE = True + Config.UPLOADS_DIR = str(tmp_path) + # Seed a minimal reddit_profiles.json + sim_dir = tmp_path / "simulations" / "sim_test" + sim_dir.mkdir(parents=True) + profiles = [{"user_id": i, "user_name": f"u{i}", "name": f"A{i}", + "persona": "p", "profession": "fisher"} for i in range(3)] + (sim_dir / "reddit_profiles.json").write_text(json.dumps(profiles), encoding="utf-8") + from flask import Flask + from app.api import register_blueprints + app = Flask(__name__) + register_blueprints(app) + return app.test_client() + +def test_post_pre_returns_task_id(client): + res = client.post("/api/interview/sim_test/pre") + assert res.status_code == 200 + body = res.get_json() + assert body["success"] is True + assert "task_id" in body["data"] + +def test_status_endpoint_returns_progress(client): + res = client.post("/api/interview/sim_test/pre") + task_id = res.get_json()["data"]["task_id"] + res2 = client.get(f"/api/interview/sim_test/status?task_id={task_id}") + assert res2.status_code == 200 + assert "status" in res2.get_json()["data"] + +def test_unknown_subagent_returns_400(client): + res = client.post("/api/interview/sim_test/rerun", + json={"subagent": "nonsense"}) + assert res.status_code == 400