fix(storage,db): path traversal fix, delete_prefix validation, remove dead import, factory uses Config
- local.py: use relative_to() for path traversal guard (fixes prefix-collision false negative) - local.py: validate delete_prefix rejects empty/root prefix to prevent full-storage wipe - local.py: remove unused `import os` - db_models.py: remove dead UniqueConstraint import - db_models.py: replace deprecated datetime.utcnow() with datetime.now(timezone.utc) - factory.py: read STORAGE_TYPE and related settings from Config instead of os.environ directly; remove `import os` Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
aab0cd355a
commit
868ce39577
|
|
@ -1,11 +1,11 @@
|
||||||
# backend/app/models/db_models.py
|
# backend/app/models/db_models.py
|
||||||
"""Models SQLAlchemy per a tota la persistència de MiroFish."""
|
"""Models SQLAlchemy per a tota la persistència de MiroFish."""
|
||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime, timezone
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from sqlalchemy import (
|
from sqlalchemy import (
|
||||||
String, Integer, Text, Boolean, DateTime, JSON,
|
String, Integer, Text, Boolean, DateTime, JSON,
|
||||||
ForeignKey, UniqueConstraint
|
ForeignKey
|
||||||
)
|
)
|
||||||
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||||
from ..db import Base
|
from ..db import Base
|
||||||
|
|
@ -16,7 +16,7 @@ def _uuid() -> str:
|
||||||
|
|
||||||
|
|
||||||
def _now() -> datetime:
|
def _now() -> datetime:
|
||||||
return datetime.utcnow()
|
return datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
|
||||||
class UserModel(Base):
|
class UserModel(Base):
|
||||||
|
|
|
||||||
|
|
@ -1,20 +1,18 @@
|
||||||
"""Selecciona la implementació de StorageService per STORAGE_TYPE."""
|
"""Selecciona la implementació de StorageService per STORAGE_TYPE."""
|
||||||
import os
|
|
||||||
from .protocol import StorageService
|
from .protocol import StorageService
|
||||||
|
|
||||||
|
|
||||||
def create_storage_service() -> StorageService:
|
def create_storage_service() -> StorageService:
|
||||||
storage_type = os.environ.get("STORAGE_TYPE", "local")
|
from app.config import Config
|
||||||
|
storage_type = Config.STORAGE_TYPE
|
||||||
match storage_type:
|
match storage_type:
|
||||||
case "azure":
|
case "azure":
|
||||||
from .azure_blob import AzureBlobStorage
|
from .azure_blob import AzureBlobStorage
|
||||||
conn_str = os.environ.get("AZURE_STORAGE_CONNECTION_STRING", "")
|
conn_str = Config.AZURE_STORAGE_CONNECTION_STRING
|
||||||
container = os.environ.get("AZURE_STORAGE_CONTAINER", "mirofish")
|
container = Config.AZURE_STORAGE_CONTAINER
|
||||||
if not conn_str:
|
if not conn_str:
|
||||||
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING no configurada per STORAGE_TYPE=azure")
|
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING no configurada per STORAGE_TYPE=azure")
|
||||||
return AzureBlobStorage(conn_str, container)
|
return AzureBlobStorage(conn_str, container)
|
||||||
case _:
|
case _:
|
||||||
from .local import LocalFSStorage
|
from .local import LocalFSStorage
|
||||||
base = os.environ.get("STORAGE_LOCAL_PATH",
|
return LocalFSStorage(Config.STORAGE_LOCAL_PATH)
|
||||||
os.path.join(os.path.dirname(__file__), "../../../uploads"))
|
|
||||||
return LocalFSStorage(base)
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
"""Adapter de storage per a filesystem local."""
|
"""Adapter de storage per a filesystem local."""
|
||||||
import io
|
import io
|
||||||
import os
|
|
||||||
import shutil
|
import shutil
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from .protocol import StorageService
|
from .protocol import StorageService
|
||||||
|
|
@ -16,7 +15,9 @@ class LocalFSStorage:
|
||||||
def _safe_path(self, relative: str) -> Path:
|
def _safe_path(self, relative: str) -> Path:
|
||||||
"""Resol el path i valida que estigui dins del base per evitar path traversal."""
|
"""Resol el path i valida que estigui dins del base per evitar path traversal."""
|
||||||
resolved = (self._base / relative).resolve()
|
resolved = (self._base / relative).resolve()
|
||||||
if not str(resolved).startswith(str(self._base)):
|
try:
|
||||||
|
resolved.relative_to(self._base)
|
||||||
|
except ValueError:
|
||||||
raise ValueError(f"Path traversal detectat: {relative!r}")
|
raise ValueError(f"Path traversal detectat: {relative!r}")
|
||||||
return resolved
|
return resolved
|
||||||
|
|
||||||
|
|
@ -41,6 +42,8 @@ class LocalFSStorage:
|
||||||
p.unlink()
|
p.unlink()
|
||||||
|
|
||||||
def delete_prefix(self, prefix: str) -> None:
|
def delete_prefix(self, prefix: str) -> None:
|
||||||
|
if not prefix or prefix in (".", "/"):
|
||||||
|
raise ValueError("prefix no pot ser buit ni arrel")
|
||||||
p = self._safe_path(prefix)
|
p = self._safe_path(prefix)
|
||||||
if p.is_dir():
|
if p.is_dir():
|
||||||
shutil.rmtree(p)
|
shutil.rmtree(p)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue