MicroFish/backend/app/api/report.py

1086 lines
32 KiB
Python

"""
Report API routes
Provides simulation report generation, retrieval, and chat endpoints
"""
import io
import os
import tempfile
import traceback
import threading
import markdown as md_lib
import fitz # PyMuPDF
from flask import request, jsonify, send_file
from . import report_bp
from ..config import Config
from ..services.report_agent import ReportAgent, ReportManager, ReportStatus
from ..services.simulation_manager import SimulationManager
from ..models.project import ProjectManager
from ..models.task import TaskManager, TaskStatus
from ..utils.logger import get_logger
from ..utils.locale import t, get_locale, set_locale
logger = get_logger('mirofish.api.report')
# ============== Report generation endpoints ==============
@report_bp.route('/generate', methods=['POST'])
def generate_report():
"""
Generate a simulation analysis report (async task)
This is a long-running operation; the endpoint returns task_id immediately.
Use GET /api/report/generate/status to poll progress.
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"force_regenerate": false // optional, force regeneration
}
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"task_id": "task_xxxx",
"status": "generating",
"message": "Report generation task started"
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
force_regenerate = data.get('force_regenerate', False)
# Get simulation info
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if not state:
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
}), 404
# Check if a report already exists
if not force_regenerate:
existing_report = ReportManager.get_report_by_simulation(simulation_id)
if existing_report and existing_report.status == ReportStatus.COMPLETED:
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"report_id": existing_report.report_id,
"status": "completed",
"message": t('api.reportAlreadyExists'),
"already_generated": True
}
})
# Get project info
project = ProjectManager.get_project(state.project_id)
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=state.project_id)
}), 404
graph_id = state.graph_id or project.graph_id
if not graph_id:
return jsonify({
"success": False,
"error": t('api.missingGraphIdEnsure')
}), 400
simulation_requirement = project.simulation_requirement
if not simulation_requirement:
return jsonify({
"success": False,
"error": t('api.missingSimRequirement')
}), 400
# Pre-generate report_id so it can be returned immediately
import uuid
report_id = f"report_{uuid.uuid4().hex[:12]}"
# Create async task
task_manager = TaskManager()
task_id = task_manager.create_task(
task_type="report_generate",
metadata={
"simulation_id": simulation_id,
"graph_id": graph_id,
"report_id": report_id
}
)
# Capture locale before spawning background thread
current_locale = get_locale()
# Define background task
def run_generate():
set_locale(current_locale)
try:
task_manager.update_task(
task_id,
status=TaskStatus.PROCESSING,
progress=0,
message=t('api.initReportAgent')
)
# Create Report Agent
agent = ReportAgent(
graph_id=graph_id,
simulation_id=simulation_id,
simulation_requirement=simulation_requirement
)
# Progress callback
def progress_callback(stage, progress, message):
task_manager.update_task(
task_id,
progress=progress,
message=f"[{stage}] {message}"
)
# Generate report (pass pre-generated report_id)
report = agent.generate_report(
progress_callback=progress_callback,
report_id=report_id
)
# Save report
ReportManager.save_report(report)
if report.status == ReportStatus.COMPLETED:
task_manager.complete_task(
task_id,
result={
"report_id": report.report_id,
"simulation_id": simulation_id,
"status": "completed"
}
)
else:
task_manager.fail_task(task_id, report.error or t('api.reportGenerateFailed'))
except Exception as e:
logger.error(f"Report generation failed: {str(e)}")
task_manager.fail_task(task_id, str(e))
# Start background thread
thread = threading.Thread(target=run_generate, daemon=True)
thread.start()
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"report_id": report_id,
"task_id": task_id,
"status": "generating",
"message": t('api.reportGenerateStarted'),
"already_generated": False
}
})
except Exception as e:
logger.error(f"Failed to start report generation task: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/generate/status', methods=['POST'])
def get_generate_status():
"""
Query report generation task progress
Request (JSON):
{
"task_id": "task_xxxx", // optional, task_id from generate
"simulation_id": "sim_xxxx" // optional, simulation ID
}
Returns:
{
"success": true,
"data": {
"task_id": "task_xxxx",
"status": "processing|completed|failed",
"progress": 45,
"message": "..."
}
}
"""
try:
data = request.get_json() or {}
task_id = data.get('task_id')
simulation_id = data.get('simulation_id')
# If simulation_id is provided, check whether a completed report exists
if simulation_id:
existing_report = ReportManager.get_report_by_simulation(simulation_id)
if existing_report and existing_report.status == ReportStatus.COMPLETED:
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"report_id": existing_report.report_id,
"status": "completed",
"progress": 100,
"message": t('api.reportGenerated'),
"already_completed": True
}
})
if not task_id:
return jsonify({
"success": False,
"error": t('api.requireTaskOrSimId')
}), 400
task_manager = TaskManager()
task = task_manager.get_task(task_id)
if not task:
return jsonify({
"success": False,
"error": t('api.taskNotFound', id=task_id)
}), 404
return jsonify({
"success": True,
"data": task.to_dict()
})
except Exception as e:
logger.error(f"Failed to query task status: {str(e)}")
return jsonify({
"success": False,
"error": str(e)
}), 500
# ============== Report retrieval endpoints ==============
@report_bp.route('/<report_id>', methods=['GET'])
def get_report(report_id: str):
"""
Get report details
Returns:
{
"success": true,
"data": {
"report_id": "report_xxxx",
"simulation_id": "sim_xxxx",
"status": "completed",
"outline": {...},
"markdown_content": "...",
"created_at": "...",
"completed_at": "..."
}
}
"""
try:
report = ReportManager.get_report(report_id)
if not report:
return jsonify({
"success": False,
"error": t('api.reportNotFound', id=report_id)
}), 404
return jsonify({
"success": True,
"data": report.to_dict()
})
except Exception as e:
logger.error(f"Failed to get report: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/by-simulation/<simulation_id>', methods=['GET'])
def get_report_by_simulation(simulation_id: str):
"""
Get report by simulation ID
Returns:
{
"success": true,
"data": {
"report_id": "report_xxxx",
...
}
}
"""
try:
report = ReportManager.get_report_by_simulation(simulation_id)
if not report:
return jsonify({
"success": False,
"error": t('api.noReportForSim', id=simulation_id),
"has_report": False
}), 404
return jsonify({
"success": True,
"data": report.to_dict(),
"has_report": True
})
except Exception as e:
logger.error(f"Failed to get report: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/list', methods=['GET'])
def list_reports():
"""
List all reports
Query parameters:
simulation_id: filter by simulation ID (optional)
limit: result count limit (default 50)
Returns:
{
"success": true,
"data": [...],
"count": 10
}
"""
try:
simulation_id = request.args.get('simulation_id')
limit = request.args.get('limit', 50, type=int)
reports = ReportManager.list_reports(
simulation_id=simulation_id,
limit=limit
)
return jsonify({
"success": True,
"data": [r.to_dict() for r in reports],
"count": len(reports)
})
except Exception as e:
logger.error(f"Failed to list reports: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
def _generate_pdf_bytes(markdown_content: str) -> bytes:
"""Convert Markdown string to PDF bytes using PyMuPDF (fitz.Story)."""
html_body = md_lib.markdown(
markdown_content,
extensions=['tables', 'fenced_code']
)
html = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ font-family: sans-serif; font-size: 12pt; line-height: 1.6;
margin: 40px; color: #1a1a1a; }}
h1 {{ font-size: 22pt; border-bottom: 2px solid #333; padding-bottom: 6px; }}
h2 {{ font-size: 16pt; margin-top: 28px; }}
h3 {{ font-size: 13pt; }}
pre {{ background: #f4f4f4; padding: 10px; border-radius: 4px;
font-size: 10pt; overflow-x: auto; }}
code {{ background: #f4f4f4; padding: 1px 4px; border-radius: 3px; font-size: 10pt; }}
blockquote {{ border-left: 3px solid #aaa; margin-left: 0;
padding-left: 12px; color: #555; }}
table {{ border-collapse: collapse; width: 100%; margin: 12px 0; }}
th, td {{ border: 1px solid #ccc; padding: 6px 10px; text-align: left; }}
th {{ background: #f0f0f0; }}
</style>
</head>
<body>{html_body}</body>
</html>"""
story = fitz.Story(html)
buf = io.BytesIO()
writer = fitz.DocumentWriter(buf)
mediabox = fitz.paper_rect("a4")
where = mediabox + (36, 36, -36, -36) # margins
more = True
while more:
device = writer.begin_page(mediabox)
more, _ = story.place(where)
story.draw(device)
writer.end_page()
writer.close()
return buf.getvalue()
@report_bp.route('/<report_id>/download', methods=['GET'])
def download_report(report_id: str):
"""
Download report in the requested format.
Query params:
format: 'md' (default) | 'pdf'
"""
try:
fmt = request.args.get('format', 'md').lower()
if fmt not in ('md', 'pdf'):
return jsonify({
"success": False,
"error": f"Unsupported format '{fmt}'. Use 'md' or 'pdf'."
}), 400
report = ReportManager.get_report(report_id)
if not report:
return jsonify({
"success": False,
"error": t('api.reportNotFound', id=report_id)
}), 404
md_path = ReportManager._get_report_markdown_path(report_id)
if os.path.exists(md_path):
with open(md_path, 'r', encoding='utf-8') as f:
markdown_content = f.read()
else:
markdown_content = report.markdown_content
if fmt == 'md':
if os.path.exists(md_path):
return send_file(
md_path,
as_attachment=True,
download_name=f"{report_id}.md"
)
with tempfile.NamedTemporaryFile(mode='w', suffix='.md',
delete=False, encoding='utf-8') as f:
f.write(markdown_content)
temp_path = f.name
return send_file(temp_path, as_attachment=True,
download_name=f"{report_id}.md")
# fmt == 'pdf'
pdf_bytes = _generate_pdf_bytes(markdown_content)
return send_file(
io.BytesIO(pdf_bytes),
mimetype='application/pdf',
as_attachment=True,
download_name=f"{report_id}.pdf"
)
except Exception as e:
logger.error(f"Failed to download report: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/<report_id>', methods=['DELETE'])
def delete_report(report_id: str):
"""Delete a report"""
try:
success = ReportManager.delete_report(report_id)
if not success:
return jsonify({
"success": False,
"error": t('api.reportNotFound', id=report_id)
}), 404
return jsonify({
"success": True,
"message": t('api.reportDeleted', id=report_id)
})
except Exception as e:
logger.error(f"Failed to delete report: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Report Agent chat endpoint ==============
@report_bp.route('/chat', methods=['POST'])
def chat_with_report_agent():
"""
Chat with the Report Agent
The Report Agent can autonomously call retrieval tools to answer questions.
Request (JSON):
{
"simulation_id": "sim_xxxx", // required, simulation ID
"message": "Explain the trend...", // required, user message
"chat_history": [ // optional, conversation history
{"role": "user", "content": "..."},
{"role": "assistant", "content": "..."}
]
}
Returns:
{
"success": true,
"data": {
"response": "Agent reply...",
"tool_calls": [list of tools called],
"sources": [information sources]
}
}
"""
try:
data = request.get_json() or {}
simulation_id = data.get('simulation_id')
message = data.get('message')
chat_history = data.get('chat_history', [])
if not simulation_id:
return jsonify({
"success": False,
"error": t('api.requireSimulationId')
}), 400
if not message:
return jsonify({
"success": False,
"error": t('api.requireMessage')
}), 400
# Get simulation and project info
manager = SimulationManager()
state = manager.get_simulation(simulation_id)
if not state:
return jsonify({
"success": False,
"error": t('api.simulationNotFound', id=simulation_id)
}), 404
project = ProjectManager.get_project(state.project_id)
if not project:
return jsonify({
"success": False,
"error": t('api.projectNotFound', id=state.project_id)
}), 404
graph_id = state.graph_id or project.graph_id
if not graph_id:
return jsonify({
"success": False,
"error": t('api.missingGraphId')
}), 400
simulation_requirement = project.simulation_requirement or ""
# Create agent and start chat
agent = ReportAgent(
graph_id=graph_id,
simulation_id=simulation_id,
simulation_requirement=simulation_requirement
)
result = agent.chat(message=message, chat_history=chat_history)
return jsonify({
"success": True,
"data": result
})
except Exception as e:
logger.error(f"Chat failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Report progress and section endpoints ==============
@report_bp.route('/<report_id>/progress', methods=['GET'])
def get_report_progress(report_id: str):
"""
Get report generation progress (real-time)
Returns:
{
"success": true,
"data": {
"status": "generating",
"progress": 45,
"message": "Generating section: Key Findings",
"current_section": "Key Findings",
"completed_sections": ["Executive Summary", "Simulation Background"],
"updated_at": "2025-12-09T..."
}
}
"""
try:
progress = ReportManager.get_progress(report_id)
if not progress:
return jsonify({
"success": False,
"error": t('api.reportProgressNotAvail', id=report_id)
}), 404
return jsonify({
"success": True,
"data": progress
})
except Exception as e:
logger.error(f"Failed to get report progress: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/<report_id>/sections', methods=['GET'])
def get_report_sections(report_id: str):
"""
Get list of already-generated sections (section-by-section output)
The frontend can poll this endpoint to get section content as it is generated,
without waiting for the full report to complete.
Returns:
{
"success": true,
"data": {
"report_id": "report_xxxx",
"sections": [
{
"filename": "section_01.md",
"section_index": 1,
"content": "## Executive Summary\\n\\n..."
},
...
],
"total_sections": 3,
"is_complete": false
}
}
"""
try:
sections = ReportManager.get_generated_sections(report_id)
# Get report status
report = ReportManager.get_report(report_id)
is_complete = report is not None and report.status == ReportStatus.COMPLETED
return jsonify({
"success": True,
"data": {
"report_id": report_id,
"sections": sections,
"total_sections": len(sections),
"is_complete": is_complete
}
})
except Exception as e:
logger.error(f"Failed to get section list: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/<report_id>/section/<int:section_index>', methods=['GET'])
def get_single_section(report_id: str, section_index: int):
"""
Get single section content
Returns:
{
"success": true,
"data": {
"filename": "section_01.md",
"content": "## Executive Summary\\n\\n..."
}
}
"""
try:
section_path = ReportManager._get_section_path(report_id, section_index)
if not os.path.exists(section_path):
return jsonify({
"success": False,
"error": t('api.sectionNotFound', index=f"{section_index:02d}")
}), 404
with open(section_path, 'r', encoding='utf-8') as f:
content = f.read()
return jsonify({
"success": True,
"data": {
"filename": f"section_{section_index:02d}.md",
"section_index": section_index,
"content": content
}
})
except Exception as e:
logger.error(f"Failed to get section content: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Report status check endpoint ==============
@report_bp.route('/check/<simulation_id>', methods=['GET'])
def check_report_status(simulation_id: str):
"""
Check whether a simulation has a report and its status
Used by the frontend to determine whether to unlock the Interview feature.
Returns:
{
"success": true,
"data": {
"simulation_id": "sim_xxxx",
"has_report": true,
"report_status": "completed",
"report_id": "report_xxxx",
"interview_unlocked": true
}
}
"""
try:
report = ReportManager.get_report_by_simulation(simulation_id)
has_report = report is not None
report_status = report.status.value if report else None
report_id = report.report_id if report else None
# Interview is unlocked only after the report is complete
interview_unlocked = has_report and report.status == ReportStatus.COMPLETED
return jsonify({
"success": True,
"data": {
"simulation_id": simulation_id,
"has_report": has_report,
"report_status": report_status,
"report_id": report_id,
"interview_unlocked": interview_unlocked
}
})
except Exception as e:
logger.error(f"Failed to check report status: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Agent log endpoints ==============
@report_bp.route('/<report_id>/agent-log', methods=['GET'])
def get_agent_log(report_id: str):
"""
Get detailed execution log of the Report Agent
Retrieves step-by-step actions during report generation, including:
- Report start, planning start/complete
- Each section's start, tool calls, LLM response, completion
- Report completion or failure
Query parameters:
from_line: start reading from this line (optional, default 0, for incremental fetch)
Returns:
{
"success": true,
"data": {
"logs": [
{
"timestamp": "2025-12-13T...",
"elapsed_seconds": 12.5,
"report_id": "report_xxxx",
"action": "tool_call",
"stage": "generating",
"section_title": "Executive Summary",
"section_index": 1,
"details": {
"tool_name": "insight_forge",
"parameters": {...},
...
}
},
...
],
"total_lines": 25,
"from_line": 0,
"has_more": false
}
}
"""
try:
from_line = request.args.get('from_line', 0, type=int)
log_data = ReportManager.get_agent_log(report_id, from_line=from_line)
return jsonify({
"success": True,
"data": log_data
})
except Exception as e:
logger.error(f"Failed to get Agent log: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/<report_id>/agent-log/stream', methods=['GET'])
def stream_agent_log(report_id: str):
"""
Get the full Agent log (fetch all at once)
Returns:
{
"success": true,
"data": {
"logs": [...],
"count": 25
}
}
"""
try:
logs = ReportManager.get_agent_log_stream(report_id)
return jsonify({
"success": True,
"data": {
"logs": logs,
"count": len(logs)
}
})
except Exception as e:
logger.error(f"Failed to get Agent log: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Console log endpoints ==============
@report_bp.route('/<report_id>/console-log', methods=['GET'])
def get_console_log(report_id: str):
"""
Get the console output log of the Report Agent
Returns real-time console output (INFO, WARNING, etc.) during report generation.
Unlike the agent-log endpoint which returns structured JSON logs,
this returns plain-text console-style logs.
Query parameters:
from_line: start reading from this line (optional, default 0, for incremental fetch)
Returns:
{
"success": true,
"data": {
"logs": [
"[19:46:14] INFO: Search complete: found 15 relevant facts",
"[19:46:14] INFO: Graph search: graph_id=xxx, query=...",
...
],
"total_lines": 100,
"from_line": 0,
"has_more": false
}
}
"""
try:
from_line = request.args.get('from_line', 0, type=int)
log_data = ReportManager.get_console_log(report_id, from_line=from_line)
return jsonify({
"success": True,
"data": log_data
})
except Exception as e:
logger.error(f"Failed to get console log: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/<report_id>/console-log/stream', methods=['GET'])
def stream_console_log(report_id: str):
"""
Get the full console log (fetch all at once)
Returns:
{
"success": true,
"data": {
"logs": [...],
"count": 100
}
}
"""
try:
logs = ReportManager.get_console_log_stream(report_id)
return jsonify({
"success": True,
"data": {
"logs": logs,
"count": len(logs)
}
})
except Exception as e:
logger.error(f"Failed to get console log: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== Tool call endpoints (for debugging) ==============
@report_bp.route('/tools/search', methods=['POST'])
def search_graph_tool():
"""
Graph search tool endpoint (for debugging)
Request (JSON):
{
"graph_id": "mirofish_xxxx",
"query": "search query",
"limit": 10
}
"""
try:
data = request.get_json() or {}
graph_id = data.get('graph_id')
query = data.get('query')
limit = data.get('limit', 10)
if not graph_id or not query:
return jsonify({
"success": False,
"error": t('api.requireGraphIdAndQuery')
}), 400
from ..services.zep_tools import ZepToolsService
tools = ZepToolsService()
result = tools.search_graph(
graph_id=graph_id,
query=query,
limit=limit
)
return jsonify({
"success": True,
"data": result.to_dict()
})
except Exception as e:
logger.error(f"Graph search failed: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@report_bp.route('/tools/statistics', methods=['POST'])
def get_graph_statistics_tool():
"""
Graph statistics tool endpoint (for debugging)
Request (JSON):
{
"graph_id": "mirofish_xxxx"
}
"""
try:
data = request.get_json() or {}
graph_id = data.get('graph_id')
if not graph_id:
return jsonify({
"success": False,
"error": t('api.requireGraphId')
}), 400
from ..services.zep_tools import ZepToolsService
tools = ZepToolsService()
result = tools.get_graph_statistics(graph_id)
return jsonify({
"success": True,
"data": result
})
except Exception as e:
logger.error(f"Failed to get graph statistics: {str(e)}")
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500