MicroFish/backend/app/api/report.py

1091 lines
32 KiB
Python

"""
Report API routes
Provides simulation report generation, retrieval, and chat endpoints
"""
import io
import os
import traceback
import threading
import markdown as md_lib
import fitz # PyMuPDF
from flask import request, jsonify, send_file
from . import report_bp
from ..config import Config
from ..services.report_agent import ReportAgent, ReportManager, ReportStatus
from ..services.simulation_manager import SimulationManager
from ..models.project import ProjectManager
from ..models.task import TaskManager, TaskStatus
from ..utils.logger import get_logger
from ..utils.locale import t, get_locale, set_locale
logger = get_logger('mirofish.api.report')
# ============== Report generation endpoints ==============
@report_bp.route('/generate', methods=['POST'])
def generate_report():
"""
Generate a simulation analysis report (async task)
This is a long-running operation; the endpoint returns task_id immediately.
Use GET /api/report/generate/status to poll progress.
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"force_regenerate": false // optional, force regeneration
}
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"task_id": "task_xxxx",
"status": "generating",
"message": "Report generation task started"
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
force_regenerate = data.get('force_regenerate', False)
# Get simulation info
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if not state:
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
}), 404
# Check if a report already exists
if not force_regenerate:
existing_report = ReportManager.get_report_by_simulation(simulation_id)
if existing_report and existing_report.status == ReportStatus.COMPLETED:
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"report_id": existing_report.report_id,
"status": "completed",
"message": t('api.reportAlreadyExists'),
"already_generated": True
}
})
# Get project info
project = ProjectManager.get_project(state.project_id)
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=state.project_id)
}), 404
graph_id = state.graph_id or project.get("graph_id")
if not graph_id:
return jsonify({
"success": False,
"error": t('api.missingGraphIdEnsure')
}), 400
# Use per-simulation graph if available (set during simulation start for isolation)
effective_graph_id = getattr(state, 'graph_id_simulation', None) or graph_id
simulation_requirement = project.get("simulation_requirement")
if not simulation_requirement:
return jsonify({
"success": False,
"error": t('api.missingSimRequirement')
}), 400
# Pre-generate report_id so it can be returned immediately
import uuid
report_id = f"report_{uuid.uuid4().hex[:12]}"
# Create async task
task_manager = TaskManager()
task_id = task_manager.create_task(
task_type="report_generate",
metadata={
"simulation_id": simulation_id,
"graph_id": effective_graph_id,
"report_id": report_id
}
)
# Capture locale before spawning background thread
current_locale = get_locale()
# Define background task
def run_generate():
set_locale(current_locale)
try:
task_manager.update_task(
task_id,
status=TaskStatus.PROCESSING,
progress=0,
message=t('api.initReportAgent')
)
# Create Report Agent (use per-simulation graph if available)
agent = ReportAgent(
graph_id=effective_graph_id,
simulation_id=simulation_id,
simulation_requirement=simulation_requirement
)
# Progress callback
def progress_callback(stage, progress, message):
task_manager.update_task(
task_id,
progress=progress,
message=f"[{stage}] {message}"
)
# Generate report (pass pre-generated report_id)
report = agent.generate_report(
progress_callback=progress_callback,
report_id=report_id
)
# Save report
ReportManager.save_report(report)
if report.status == ReportStatus.COMPLETED:
task_manager.complete_task(
task_id,
result={
"report_id": report.report_id,
"simulation_id": simulation_id,
"status": "completed"
}
)
else:
task_manager.fail_task(task_id, report.error or t('api.reportGenerateFailed'))
except Exception as e:
logger.error(f"Report generation failed: {str(e)}")
task_manager.fail_task(task_id, str(e))
# Start background thread
thread = threading.Thread(target=run_generate, daemon=True)
thread.start()
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"report_id": report_id,
"task_id": task_id,
"status": "generating",
"message": t('api.reportGenerateStarted'),
"already_generated": False
}
})
except Exception as e:
logger.error(f"Failed to start report generation task: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/generate/status', methods=['POST'])
def get_generate_status():
"""
Query report generation task progress
Request (JSON):
{
"task_id": "task_xxxx", // optional, task_id from generate
"simulation_id": "sim_xxxx" // optional, simulation ID
}
Returns:
{
"success": true,
"data": {
"task_id": "task_xxxx",
"status": "processing|completed|failed",
"progress": 45,
"message": "..."
}
}
"""
try:
data = request.get_json() or {}
task_id = data.get('task_id')
simulation_id = data.get('simulation_id')
# If simulation_id is provided, check whether a completed report exists
if simulation_id:
existing_report = ReportManager.get_report_by_simulation(simulation_id)
if existing_report and existing_report.status == ReportStatus.COMPLETED:
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"report_id": existing_report.report_id,
"status": "completed",
"progress": 100,
"message": t('api.reportGenerated'),
"already_completed": True
}
})
if not task_id:
return jsonify({
"success": False,
"error": t('api.requireTaskOrSimId')
}), 400
task_manager = TaskManager()
task = task_manager.get_task(task_id)
if not task:
return jsonify({
"success": False,
"error": t('api.taskNotFound', id=task_id)
}), 404
return jsonify({
"success": True,
"data": task.to_dict()
})
except Exception as e:
logger.error(f"Failed to query task status: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
# ============== Report retrieval endpoints ==============
@report_bp.route('/<report_id>', methods=['GET'])
def get_report(report_id: str):
"""
Get report details
Returns:
{
"success": true,
"data": {
"report_id": "report_xxxx",
"simulation_id": "sim_xxxx",
"status": "completed",
"outline": {...},
"markdown_content": "...",
"created_at": "...",
"completed_at": "..."
}
}
"""
try:
report = ReportManager.get_report(report_id)
if not report:
return jsonify({
"success": False,
"error": t('api.reportNotFound', id=report_id)
}), 404
return jsonify({
"success": True,
"data": report.to_dict()
})
except Exception as e:
logger.error(f"Failed to get report: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/by-simulation/<simulation_id>', methods=['GET'])
def get_report_by_simulation(simulation_id: str):
"""
Get report by simulation ID
Returns:
{
"success": true,
"data": {
"report_id": "report_xxxx",
...
}
}
"""
try:
report = ReportManager.get_report_by_simulation(simulation_id)
if not report:
return jsonify({
"success": False,
"error": t('api.noReportForSim', id=simulation_id),
"has_report": False
}), 404
return jsonify({
"success": True,
"data": report.to_dict(),
"has_report": True
})
except Exception as e:
logger.error(f"Failed to get report: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/list', methods=['GET'])
def list_reports():
"""
List all reports
Query parameters:
simulation_id: filter by simulation ID (optional)
limit: result count limit (default 50)
Returns:
{
"success": true,
"data": [...],
"count": 10
}
"""
try:
simulation_id = request.args.get('simulation_id')
limit = request.args.get('limit', 50, type=int)
reports = ReportManager.list_reports(
simulation_id=simulation_id,
limit=limit
)
return jsonify({
"success": True,
"data": [r.to_dict() for r in reports],
"count": len(reports)
})
except Exception as e:
logger.error(f"Failed to list reports: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
def _generate_pdf_bytes(markdown_content: str) -> bytes:
"""Convert Markdown string to PDF bytes using PyMuPDF (fitz.Story)."""
html_body = md_lib.markdown(
markdown_content,
extensions=['tables', 'fenced_code']
)
html = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ font-family: sans-serif; font-size: 12pt; line-height: 1.6;
margin: 40px; color: #1a1a1a; }}
h1 {{ font-size: 22pt; border-bottom: 2px solid #333; padding-bottom: 6px; }}
h2 {{ font-size: 16pt; margin-top: 28px; }}
h3 {{ font-size: 13pt; }}
pre {{ background: #f4f4f4; padding: 10px; border-radius: 4px;
font-size: 10pt; overflow-x: auto; }}
code {{ background: #f4f4f4; padding: 1px 4px; border-radius: 3px; font-size: 10pt; }}
blockquote {{ border-left: 3px solid #aaa; margin-left: 0;
padding-left: 12px; color: #555; }}
table {{ border-collapse: collapse; width: 100%; margin: 12px 0; }}
th, td {{ border: 1px solid #ccc; padding: 6px 10px; text-align: left; }}
th {{ background: #f0f0f0; }}
</style>
</head>
<body>{html_body}</body>
</html>"""
story = fitz.Story(html)
buf = io.BytesIO()
writer = fitz.DocumentWriter(buf)
mediabox = fitz.paper_rect("a4")
where = mediabox + (36, 36, -36, -36) # margins
more = True
while more:
device = writer.begin_page(mediabox)
more, _ = story.place(where)
story.draw(device)
writer.end_page()
writer.close()
return buf.getvalue()
@report_bp.route('/<report_id>/download', methods=['GET'])
def download_report(report_id: str):
"""
Download report in the requested format.
Query params:
format: 'md' (default) | 'pdf'
"""
try:
fmt = request.args.get('format', 'md').lower()
if fmt not in ('md', 'pdf'):
return jsonify({
"success": False,
"error": f"Unsupported format '{fmt}'. Use 'md' or 'pdf'."
}), 400
report = ReportManager.get_report(report_id)
if not report:
return jsonify({
"success": False,
"error": t('api.reportNotFound', id=report_id)
}), 404
md_path = ReportManager._get_report_markdown_path(report_id)
if os.path.exists(md_path):
with open(md_path, 'r', encoding='utf-8') as f:
markdown_content = f.read()
else:
markdown_content = report.markdown_content
if fmt == 'md':
if os.path.exists(md_path):
return send_file(
md_path,
as_attachment=True,
download_name=f"{report_id}.md"
)
return send_file(
io.BytesIO(markdown_content.encode('utf-8')),
mimetype='text/markdown; charset=utf-8',
as_attachment=True,
download_name=f"{report_id}.md"
)
# fmt == 'pdf'
pdf_bytes = _generate_pdf_bytes(markdown_content)
return send_file(
io.BytesIO(pdf_bytes),
mimetype='application/pdf',
as_attachment=True,
download_name=f"{report_id}.pdf"
)
except Exception as e:
logger.error(f"Failed to download report: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/<report_id>', methods=['DELETE'])
def delete_report(report_id: str):
"""Delete a report"""
try:
success = ReportManager.delete_report(report_id)
if not success:
return jsonify({
"success": False,
"error": t('api.reportNotFound', id=report_id)
}), 404
return jsonify({
"success": True,
"message": t('api.reportDeleted', id=report_id)
})
except Exception as e:
logger.error(f"Failed to delete report: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Report Agent chat endpoint ==============
@report_bp.route('/chat', methods=['POST'])
def chat_with_report_agent():
"""
Chat with the Report Agent
The Report Agent can autonomously call retrieval tools to answer questions.
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"message": "Explain the trend...", // required, user message
"chat_history": [ // optional, conversation history
{"role": "user", "content": "..."},
{"role": "assistant", "content": "..."}
]
}
Returns:
{
"success": true,
"data": {
"response": "Agent reply...",
"tool_calls": [list of tools called],
"sources": [information sources]
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
message = data.get('message')
chat_history = data.get('chat_history', [])
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
if not message:
return jsonify({
"success": False,
"error": t('api.requireMessage')
}), 400
# Get simulation and project info
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if not state:
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
}), 404
project = ProjectManager.get_project(state.project_id)
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=state.project_id)
}), 404
graph_id = state.graph_id or project.get("graph_id")
if not graph_id:
return jsonify({
"success": False,
"error": t('api.missingGraphId')
}), 400
# Use per-simulation graph if available
effective_graph_id = getattr(state, 'graph_id_simulation', None) or graph_id
simulation_requirement = project.get("simulation_requirement") or ""
# Create agent and start chat
agent = ReportAgent(
graph_id=effective_graph_id,
simulation_id=simulation_id,
simulation_requirement=simulation_requirement
)
result = agent.chat(message=message, chat_history=chat_history)
return jsonify({
"success": True,
"data": result
})
except Exception as e:
logger.error(f"Chat failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Report progress and section endpoints ==============
@report_bp.route('/<report_id>/progress', methods=['GET'])
def get_report_progress(report_id: str):
"""
Get report generation progress (real-time)
Returns:
{
"success": true,
"data": {
"status": "generating",
"progress": 45,
"message": "Generating section: Key Findings",
"current_section": "Key Findings",
"completed_sections": ["Executive Summary", "Simulation Background"],
"updated_at": "2025-12-09T..."
}
}
"""
try:
progress = ReportManager.get_progress(report_id)
if not progress:
return jsonify({
"success": False,
"error": t('api.reportProgressNotAvail', id=report_id)
}), 404
return jsonify({
"success": True,
"data": progress
})
except Exception as e:
logger.error(f"Failed to get report progress: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/<report_id>/sections', methods=['GET'])
def get_report_sections(report_id: str):
"""
Get list of already-generated sections (section-by-section output)
The frontend can poll this endpoint to get section content as it is generated,
without waiting for the full report to complete.
Returns:
{
"success": true,
"data": {
"report_id": "report_xxxx",
"sections": [
{
"filename": "section_01.md",
"section_index": 1,
"content": "## Executive Summary\\n\\n..."
},
...
],
"total_sections": 3,
"is_complete": false
}
}
"""
try:
sections = ReportManager.get_generated_sections(report_id)
# Get report status
report = ReportManager.get_report(report_id)
is_complete = report is not None and report.status == ReportStatus.COMPLETED
return jsonify({
"success": True,
"data": {
"report_id": report_id,
"sections": sections,
"total_sections": len(sections),
"is_complete": is_complete
}
})
except Exception as e:
logger.error(f"Failed to get section list: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/<report_id>/section/<int:section_index>', methods=['GET'])
def get_single_section(report_id: str, section_index: int):
"""
Get single section content
Returns:
{
"success": true,
"data": {
"filename": "section_01.md",
"content": "## Executive Summary\\n\\n..."
}
}
"""
try:
section_path = ReportManager._get_section_path(report_id, section_index)
if not os.path.exists(section_path):
return jsonify({
"success": False,
"error": t('api.sectionNotFound', index=f"{section_index:02d}")
}), 404
with open(section_path, 'r', encoding='utf-8') as f:
content = f.read()
return jsonify({
"success": True,
"data": {
"filename": f"section_{section_index:02d}.md",
"section_index": section_index,
"content": content
}
})
except Exception as e:
logger.error(f"Failed to get section content: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Report status check endpoint ==============
@report_bp.route('/check/<simulation_id>', methods=['GET'])
def check_report_status(simulation_id: str):
"""
Check whether a simulation has a report and its status
Used by the frontend to determine whether to unlock the Interview feature.
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"has_report": true,
"report_status": "completed",
"report_id": "report_xxxx",
"interview_unlocked": true
}
}
"""
try:
report = ReportManager.get_report_by_simulation(simulation_id)
has_report = report is not None
report_status = report.status.value if report else None
report_id = report.report_id if report else None
# Interview is unlocked only after the report is complete
interview_unlocked = has_report and report.status == ReportStatus.COMPLETED
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"has_report": has_report,
"report_status": report_status,
"report_id": report_id,
"interview_unlocked": interview_unlocked
}
})
except Exception as e:
logger.error(f"Failed to check report status: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Agent log endpoints ==============
@report_bp.route('/<report_id>/agent-log', methods=['GET'])
def get_agent_log(report_id: str):
"""
Get detailed execution log of the Report Agent
Retrieves step-by-step actions during report generation, including:
- Report start, planning start/complete
- Each section's start, tool calls, LLM response, completion
- Report completion or failure
Query parameters:
from_line: start reading from this line (optional, default 0, for incremental fetch)
Returns:
{
"success": true,
"data": {
"logs": [
{
"timestamp": "2025-12-13T...",
"elapsed_seconds": 12.5,
"report_id": "report_xxxx",
"action": "tool_call",
"stage": "generating",
"section_title": "Executive Summary",
"section_index": 1,
"details": {
"tool_name": "insight_forge",
"parameters": {...},
...
}
},
...
],
"total_lines": 25,
"from_line": 0,
"has_more": false
}
}
"""
try:
from_line = request.args.get('from_line', 0, type=int)
log_data = ReportManager.get_agent_log(report_id, from_line=from_line)
return jsonify({
"success": True,
"data": log_data
})
except Exception as e:
logger.error(f"Failed to get Agent log: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/<report_id>/agent-log/stream', methods=['GET'])
def stream_agent_log(report_id: str):
"""
Get the full Agent log (fetch all at once)
Returns:
{
"success": true,
"data": {
"logs": [...],
"count": 25
}
}
"""
try:
logs = ReportManager.get_agent_log_stream(report_id)
return jsonify({
"success": True,
"data": {
"logs": logs,
"count": len(logs)
}
})
except Exception as e:
logger.error(f"Failed to get Agent log: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Console log endpoints ==============
@report_bp.route('/<report_id>/console-log', methods=['GET'])
def get_console_log(report_id: str):
"""
Get the console output log of the Report Agent
Returns real-time console output (INFO, WARNING, etc.) during report generation.
Unlike the agent-log endpoint which returns structured JSON logs,
this returns plain-text console-style logs.
Query parameters:
from_line: start reading from this line (optional, default 0, for incremental fetch)
Returns:
{
"success": true,
"data": {
"logs": [
"[19:46:14] INFO: Search complete: found 15 relevant facts",
"[19:46:14] INFO: Graph search: graph_id=xxx, query=...",
...
],
"total_lines": 100,
"from_line": 0,
"has_more": false
}
}
"""
try:
from_line = request.args.get('from_line', 0, type=int)
log_data = ReportManager.get_console_log(report_id, from_line=from_line)
return jsonify({
"success": True,
"data": log_data
})
except Exception as e:
logger.error(f"Failed to get console log: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/<report_id>/console-log/stream', methods=['GET'])
def stream_console_log(report_id: str):
"""
Get the full console log (fetch all at once)
Returns:
{
"success": true,
"data": {
"logs": [...],
"count": 100
}
}
"""
try:
logs = ReportManager.get_console_log_stream(report_id)
return jsonify({
"success": True,
"data": {
"logs": logs,
"count": len(logs)
}
})
except Exception as e:
logger.error(f"Failed to get console log: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Tool call endpoints (for debugging) ==============
@report_bp.route('/tools/search', methods=['POST'])
def search_graph_tool():
"""
Graph search tool endpoint (for debugging)
Request (JSON):
{
"graph_id": "mirofish_xxxx",
"query": "search query",
"limit": 10
}
"""
try:
data = request.get_json() or {}
graph_id = data.get('graph_id')
query = data.get('query')
limit = data.get('limit', 10)
if not graph_id or not query:
return jsonify({
"success": False,
"error": t('api.requireGraphIdAndQuery')
}), 400
from ..services.zep_tools import ZepToolsService
tools = ZepToolsService()
result = tools.search_graph(
graph_id=graph_id,
query=query,
limit=limit
)
return jsonify({
"success": True,
"data": result.to_dict()
})
except Exception as e:
logger.error(f"Graph search failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/tools/statistics', methods=['POST'])
def get_graph_statistics_tool():
"""
Graph statistics tool endpoint (for debugging)
Request (JSON):
{
"graph_id": "mirofish_xxxx"
}
"""
try:
data = request.get_json() or {}
graph_id = data.get('graph_id')
if not graph_id:
return jsonify({
"success": False,
"error": t('api.requireGraphId')
}), 400
from ..services.zep_tools import ZepToolsService
tools = ZepToolsService()
result = tools.get_graph_statistics(graph_id)
return jsonify({
"success": True,
"data": result
})
except Exception as e:
logger.error(f"Failed to get graph statistics: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500