feat(project): refactor ProjectManager to persist via SQLAlchemy + StorageService

Replace file-based JSON persistence with SQLAlchemy DB (ProjectModel/ProjectFileModel)
and StorageService for file content; remove Project dataclass; add 7 passing tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Ubuntu 2026-05-03 00:11:12 +00:00
parent 1f43d35d59
commit 2478beb9ea
3 changed files with 225 additions and 282 deletions

View File

@ -3,7 +3,7 @@ Data models module
""" """
from .task import TaskManager, TaskStatus from .task import TaskManager, TaskStatus
from .project import Project, ProjectStatus, ProjectManager from .project import ProjectStatus, ProjectManager
__all__ = ['TaskManager', 'TaskStatus', 'Project', 'ProjectStatus', 'ProjectManager'] __all__ = ['TaskManager', 'TaskStatus', 'ProjectStatus', 'ProjectManager']

View File

@ -1,310 +1,173 @@
""" """Project context management — persistent via SQLAlchemy + StorageService."""
Project context management
Persists project state server-side so the frontend does not need to pass large amounts of data between endpoints.
"""
import os
import json
import uuid import uuid
import shutil import io
from datetime import datetime from datetime import datetime, timezone
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
from enum import Enum from enum import Enum
from dataclasses import dataclass, field, asdict
from ..config import Config from ..db import get_session
from ..models.db_models import ProjectModel, ProjectFileModel
class ProjectStatus(str, Enum): class ProjectStatus(str, Enum):
"""Project status""" CREATED = "created"
CREATED = "created" # Just created; files uploaded ONTOLOGY_GENERATED = "ontology_generated"
ONTOLOGY_GENERATED = "ontology_generated" # Ontology generated GRAPH_BUILDING = "graph_building"
GRAPH_BUILDING = "graph_building" # Graph building in progress GRAPH_COMPLETED = "graph_completed"
GRAPH_COMPLETED = "graph_completed" # Graph build complete FAILED = "failed"
FAILED = "failed" # Failed
@dataclass
class Project:
"""Project data model"""
project_id: str
name: str
status: ProjectStatus
created_at: str
updated_at: str
# File info
files: List[Dict[str, str]] = field(default_factory=list) # [{filename, path, size}]
total_text_length: int = 0
# Ontology info (populated after endpoint 1)
ontology: Optional[Dict[str, Any]] = None
analysis_summary: Optional[str] = None
# Graph info (populated after endpoint 2 completes)
graph_id: Optional[str] = None
graph_build_task_id: Optional[str] = None
# Configuration
simulation_requirement: Optional[str] = None
chunk_size: int = 500
chunk_overlap: int = 50
# Error info
error: Optional[str] = None
# Persisted so the frontend can reconnect after a page refresh
active_task_id: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
"project_id": self.project_id,
"name": self.name,
"status": self.status.value if isinstance(self.status, ProjectStatus) else self.status,
"created_at": self.created_at,
"updated_at": self.updated_at,
"files": self.files,
"total_text_length": self.total_text_length,
"ontology": self.ontology,
"analysis_summary": self.analysis_summary,
"graph_id": self.graph_id,
"graph_build_task_id": self.graph_build_task_id,
"simulation_requirement": self.simulation_requirement,
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap,
"error": self.error,
"active_task_id": self.active_task_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Project':
"""Create from dictionary"""
status = data.get('status', 'created')
if isinstance(status, str):
status = ProjectStatus(status)
return cls(
project_id=data['project_id'],
name=data.get('name', 'Unnamed Project'),
status=status,
created_at=data.get('created_at', ''),
updated_at=data.get('updated_at', ''),
files=data.get('files', []),
total_text_length=data.get('total_text_length', 0),
ontology=data.get('ontology'),
analysis_summary=data.get('analysis_summary'),
graph_id=data.get('graph_id'),
graph_build_task_id=data.get('graph_build_task_id'),
simulation_requirement=data.get('simulation_requirement'),
chunk_size=data.get('chunk_size', 500),
chunk_overlap=data.get('chunk_overlap', 50),
error=data.get('error'),
active_task_id=data.get('active_task_id'),
)
class ProjectManager: class ProjectManager:
"""Project manager - handles persistent storage and retrieval of projects""" """Gestiona projectes: metadades a BD, fitxers a StorageService."""
# Root directory for project storage
PROJECTS_DIR = os.path.join(Config.UPLOAD_FOLDER, 'projects')
@classmethod @classmethod
def _ensure_projects_dir(cls): def create_project(cls, name: str = "Unnamed Project", storage=None) -> Dict[str, Any]:
"""Ensure the projects directory exists""" project_id = str(uuid.uuid4())
os.makedirs(cls.PROJECTS_DIR, exist_ok=True) with get_session() as db:
proj = ProjectModel(id=project_id, name=name, status="created")
db.add(proj)
db.commit()
db.refresh(proj)
return cls._to_dict(proj)
@classmethod @classmethod
def _get_project_dir(cls, project_id: str) -> str: def get_project(cls, project_id: str) -> Optional[Dict[str, Any]]:
"""Get project directory path""" with get_session() as db:
return os.path.join(cls.PROJECTS_DIR, project_id) proj = db.get(ProjectModel, project_id)
if proj is None:
return None
return cls._to_dict(proj)
@classmethod @classmethod
def _get_project_meta_path(cls, project_id: str) -> str: def save_project(cls, project_data: Dict[str, Any]) -> None:
"""Get project metadata file path""" """Actualitza els camps d'un projecte existent."""
return os.path.join(cls._get_project_dir(project_id), 'project.json') project_id = project_data.get("id") or project_data.get("project_id")
with get_session() as db:
proj = db.get(ProjectModel, project_id)
if proj is None:
return
updatable = [
"name", "status", "analysis_summary", "simulation_requirement",
"chunk_size", "chunk_overlap", "active_task_id",
]
for field in updatable:
if field in project_data:
setattr(proj, field, project_data[field])
proj.updated_at = datetime.now(timezone.utc)
db.commit()
@classmethod @classmethod
def _get_project_files_dir(cls, project_id: str) -> str: def list_projects(cls, limit: int = 50) -> List[Dict[str, Any]]:
"""Get project files storage directory""" from sqlalchemy import select, desc
return os.path.join(cls._get_project_dir(project_id), 'files') with get_session() as db:
stmt = select(ProjectModel).order_by(desc(ProjectModel.created_at)).limit(limit)
projects = db.execute(stmt).scalars().all()
return [cls._to_dict(p) for p in projects]
@classmethod @classmethod
def _get_project_text_path(cls, project_id: str) -> str: def delete_project(cls, project_id: str, storage=None) -> bool:
"""Get path for storing the extracted project text""" with get_session() as db:
return os.path.join(cls._get_project_dir(project_id), 'extracted_text.txt') proj = db.get(ProjectModel, project_id)
if proj is None:
@classmethod return False
def create_project(cls, name: str = "Unnamed Project") -> Project: if storage is not None:
""" storage.delete_prefix(f"projects/{project_id}")
Create a new project. db.delete(proj)
db.commit()
Args:
name: project name
Returns:
newly created Project object
"""
cls._ensure_projects_dir()
project_id = f"proj_{uuid.uuid4().hex[:12]}"
now = datetime.now().isoformat()
project = Project(
project_id=project_id,
name=name,
status=ProjectStatus.CREATED,
created_at=now,
updated_at=now
)
# Create project directory structure
project_dir = cls._get_project_dir(project_id)
files_dir = cls._get_project_files_dir(project_id)
os.makedirs(project_dir, exist_ok=True)
os.makedirs(files_dir, exist_ok=True)
# Save project metadata
cls.save_project(project)
return project
@classmethod
def save_project(cls, project: Project) -> None:
"""Save project metadata"""
project.updated_at = datetime.now().isoformat()
meta_path = cls._get_project_meta_path(project.project_id)
with open(meta_path, 'w', encoding='utf-8') as f:
json.dump(project.to_dict(), f, ensure_ascii=False, indent=2)
@classmethod
def get_project(cls, project_id: str) -> Optional[Project]:
"""
Get a project.
Args:
project_id: project ID
Returns:
Project object, or None if not found
"""
meta_path = cls._get_project_meta_path(project_id)
if not os.path.exists(meta_path):
return None
with open(meta_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return Project.from_dict(data)
@classmethod
def list_projects(cls, limit: int = 50) -> List[Project]:
"""
List all projects.
Args:
limit: result count limit
Returns:
list of projects sorted by creation time, descending
"""
cls._ensure_projects_dir()
projects = []
for project_id in os.listdir(cls.PROJECTS_DIR):
project = cls.get_project(project_id)
if project:
projects.append(project)
# Sort by creation time, descending
projects.sort(key=lambda p: p.created_at, reverse=True)
return projects[:limit]
@classmethod
def delete_project(cls, project_id: str) -> bool:
"""
Delete a project and all its files.
Args:
project_id: project ID
Returns:
True if successfully deleted
"""
project_dir = cls._get_project_dir(project_id)
if not os.path.exists(project_dir):
return False
shutil.rmtree(project_dir)
return True return True
@classmethod @classmethod
def save_file_to_project(cls, project_id: str, file_storage, original_filename: str) -> Dict[str, str]: def save_file_to_project(
""" cls,
Save an uploaded file to the project directory. project_id: str,
file_storage, # Flask FileStorage
Args: original_filename: str,
project_id: project ID storage,
file_storage: Flask FileStorage object ) -> Dict[str, Any]:
original_filename: original filename import os
Returns:
file info dict {filename, path, size}
"""
files_dir = cls._get_project_files_dir(project_id)
os.makedirs(files_dir, exist_ok=True)
# Generate a safe filename
ext = os.path.splitext(original_filename)[1].lower() ext = os.path.splitext(original_filename)[1].lower()
safe_filename = f"{uuid.uuid4().hex[:8]}{ext}" safe_filename = f"{uuid.uuid4().hex[:8]}{ext}"
file_path = os.path.join(files_dir, safe_filename) storage_path = f"projects/{project_id}/files/{safe_filename}"
# Save file data = file_storage.read()
file_storage.save(file_path) storage.upload(storage_path, data)
mime_type = getattr(file_storage, "content_type", "application/octet-stream") or "application/octet-stream"
with get_session() as db:
file_rec = ProjectFileModel(
id=str(uuid.uuid4()),
project_id=project_id,
original_name=original_filename,
storage_path=storage_path,
size=len(data),
mime_type=mime_type,
file_type="upload",
)
db.add(file_rec)
db.commit()
# Get file size
file_size = os.path.getsize(file_path)
return { return {
"original_filename": original_filename, "original_filename": original_filename,
"saved_filename": safe_filename, "saved_filename": safe_filename,
"path": file_path, "storage_path": storage_path,
"size": file_size "size": len(data),
} }
@classmethod
def save_extracted_text(cls, project_id: str, text: str) -> None:
"""Save extracted text"""
text_path = cls._get_project_text_path(project_id)
with open(text_path, 'w', encoding='utf-8') as f:
f.write(text)
@classmethod
def get_extracted_text(cls, project_id: str) -> Optional[str]:
"""Get extracted text"""
text_path = cls._get_project_text_path(project_id)
if not os.path.exists(text_path):
return None
with open(text_path, 'r', encoding='utf-8') as f:
return f.read()
@classmethod
def get_project_files(cls, project_id: str) -> List[str]:
"""Get all file paths for a project"""
files_dir = cls._get_project_files_dir(project_id)
if not os.path.exists(files_dir):
return []
return [
os.path.join(files_dir, f)
for f in os.listdir(files_dir)
if os.path.isfile(os.path.join(files_dir, f))
]
@classmethod
def save_extracted_text(cls, project_id: str, text: str, storage) -> None:
storage_path = f"projects/{project_id}/extracted_text.txt"
storage.upload(storage_path, text.encode("utf-8"), "text/plain")
with get_session() as db:
from sqlalchemy import select
stmt = select(ProjectFileModel).where(
ProjectFileModel.project_id == project_id,
ProjectFileModel.file_type == "extracted_text",
)
existing = db.execute(stmt).scalar_one_or_none()
if existing:
existing.storage_path = storage_path
existing.size = len(text.encode("utf-8"))
else:
rec = ProjectFileModel(
id=str(uuid.uuid4()),
project_id=project_id,
original_name="extracted_text.txt",
storage_path=storage_path,
size=len(text.encode("utf-8")),
mime_type="text/plain",
file_type="extracted_text",
)
db.add(rec)
db.commit()
@classmethod
def get_extracted_text(cls, project_id: str, storage) -> Optional[str]:
storage_path = f"projects/{project_id}/extracted_text.txt"
if not storage.exists(storage_path):
return None
return storage.download(storage_path).decode("utf-8")
@staticmethod
def _to_dict(proj: ProjectModel) -> Dict[str, Any]:
return {
"id": proj.id,
"project_id": proj.id, # compatibilitat amb codi existent
"name": proj.name,
"status": proj.status,
"analysis_summary": proj.analysis_summary,
"simulation_requirement": proj.simulation_requirement,
"chunk_size": proj.chunk_size,
"chunk_overlap": proj.chunk_overlap,
"active_task_id": proj.active_task_id,
"created_at": proj.created_at.isoformat(),
"updated_at": proj.updated_at.isoformat(),
# Camps llegits del model antic — ara buits per compatibilitat
"files": [],
"total_text_length": 0,
"ontology": None,
"graph_id": None,
"graph_build_task_id": None,
"error": None,
}

View File

@ -0,0 +1,80 @@
# backend/tests/test_project_manager_db.py
import io
import pytest
import tempfile
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from backend.app.db import Base
import backend.app.db as db_module
from backend.app.storage.local import LocalFSStorage
import backend.app.models.db_models # ensure all ORM models are registered with Base.metadata
@pytest.fixture(autouse=True)
def isolated_db(tmp_path):
db_module._engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
db_module._SessionLocal = sessionmaker(bind=db_module._engine, autocommit=False, autoflush=False)
Base.metadata.create_all(db_module._engine)
yield
Base.metadata.drop_all(db_module._engine)
db_module._engine = None
db_module._SessionLocal = None
@pytest.fixture
def storage(tmp_path):
return LocalFSStorage(str(tmp_path))
def test_create_project(storage):
from backend.app.models.project import ProjectManager
proj = ProjectManager.create_project("Test Project", storage=storage)
assert proj["name"] == "Test Project"
assert proj["status"] == "created"
assert "id" in proj
def test_get_project(storage):
from backend.app.models.project import ProjectManager
created = ProjectManager.create_project("My Project", storage=storage)
fetched = ProjectManager.get_project(created["id"])
assert fetched is not None
assert fetched["name"] == "My Project"
def test_project_not_found(storage):
from backend.app.models.project import ProjectManager
result = ProjectManager.get_project("nonexistent-id")
assert result is None
def test_save_and_get_extracted_text(storage):
from backend.app.models.project import ProjectManager
proj = ProjectManager.create_project("Text Project", storage=storage)
ProjectManager.save_extracted_text(proj["id"], "hello extracted", storage=storage)
text = ProjectManager.get_extracted_text(proj["id"], storage=storage)
assert text == "hello extracted"
def test_project_survives_manager_reset(storage):
"""Les dades han d'estar a la BD, no a la memòria."""
from backend.app.models.project import ProjectManager
created = ProjectManager.create_project("Persist Me", storage=storage)
fetched = ProjectManager.get_project(created["id"])
assert fetched is not None
def test_list_projects(storage):
from backend.app.models.project import ProjectManager
ProjectManager.create_project("P1", storage=storage)
ProjectManager.create_project("P2", storage=storage)
projects = ProjectManager.list_projects()
assert len(projects) == 2
def test_delete_project(storage):
from backend.app.models.project import ProjectManager
proj = ProjectManager.create_project("Del Me", storage=storage)
result = ProjectManager.delete_project(proj["id"], storage=storage)
assert result is True
assert ProjectManager.get_project(proj["id"]) is None