feat(storage): add StorageService protocol, LocalFSStorage, AzureBlobStorage, factory

This commit is contained in:
Ubuntu 2026-05-02 23:53:19 +00:00
parent ca2f9e2a8f
commit 9e33f823d9
6 changed files with 235 additions and 0 deletions

View File

@ -0,0 +1,5 @@
"""Storage service for file management."""
from .protocol import StorageService
from .factory import create_storage_service
__all__ = ["StorageService", "create_storage_service"]

View File

@ -0,0 +1,53 @@
"""Adapter de storage per a Azure Blob Storage."""
import io
from .protocol import StorageService
class AzureBlobStorage:
"""Implementació de StorageService per a Azure Blob Storage."""
def __init__(self, connection_string: str, container_name: str) -> None:
from azure.storage.blob import BlobServiceClient
self._client = BlobServiceClient.from_connection_string(connection_string)
self._container = container_name
self._ensure_container()
def _ensure_container(self) -> None:
container_client = self._client.get_container_client(self._container)
if not container_client.exists():
container_client.create_container()
def _blob_client(self, path: str):
return self._client.get_blob_client(container=self._container, blob=path)
def upload(self, path: str, data: bytes | io.IOBase, content_type: str = "application/octet-stream") -> None:
blob = self._blob_client(path)
if isinstance(data, bytes):
blob.upload_blob(data, overwrite=True, content_settings={"content_type": content_type})
else:
blob.upload_blob(data, overwrite=True, content_settings={"content_type": content_type})
def download(self, path: str) -> bytes:
return self._blob_client(path).download_blob().readall()
def download_stream(self, path: str) -> io.BytesIO:
return io.BytesIO(self.download(path))
def delete(self, path: str) -> None:
self._blob_client(path).delete_blob(delete_snapshots="include")
def delete_prefix(self, prefix: str) -> None:
container = self._client.get_container_client(self._container)
blobs = container.list_blobs(name_starts_with=prefix)
for blob in blobs:
container.delete_blob(blob.name, delete_snapshots="include")
def exists(self, path: str) -> bool:
return self._blob_client(path).exists()
def list(self, prefix: str = "") -> list[str]:
container = self._client.get_container_client(self._container)
return [b.name for b in container.list_blobs(name_starts_with=prefix)]
def public_url(self, path: str) -> str | None:
return self._blob_client(path).url

View File

@ -0,0 +1,20 @@
"""Selecciona la implementació de StorageService per STORAGE_TYPE."""
import os
from .protocol import StorageService
def create_storage_service() -> StorageService:
storage_type = os.environ.get("STORAGE_TYPE", "local")
match storage_type:
case "azure":
from .azure_blob import AzureBlobStorage
conn_str = os.environ.get("AZURE_STORAGE_CONNECTION_STRING", "")
container = os.environ.get("AZURE_STORAGE_CONTAINER", "mirofish")
if not conn_str:
raise RuntimeError("AZURE_STORAGE_CONNECTION_STRING no configurada per STORAGE_TYPE=azure")
return AzureBlobStorage(conn_str, container)
case _:
from .local import LocalFSStorage
base = os.environ.get("STORAGE_LOCAL_PATH",
os.path.join(os.path.dirname(__file__), "../../../uploads"))
return LocalFSStorage(base)

View File

@ -0,0 +1,64 @@
"""Adapter de storage per a filesystem local."""
import io
import os
import shutil
from pathlib import Path
from .protocol import StorageService
class LocalFSStorage:
"""Implementació de StorageService per a filesystem local."""
def __init__(self, base_path: str) -> None:
self._base = Path(base_path).resolve()
self._base.mkdir(parents=True, exist_ok=True)
def _safe_path(self, relative: str) -> Path:
"""Resol el path i valida que estigui dins del base per evitar path traversal."""
resolved = (self._base / relative).resolve()
if not str(resolved).startswith(str(self._base)):
raise ValueError(f"Path traversal detectat: {relative!r}")
return resolved
def upload(self, path: str, data: bytes | io.IOBase, content_type: str = "application/octet-stream") -> None:
dest = self._safe_path(path)
dest.parent.mkdir(parents=True, exist_ok=True)
if isinstance(data, bytes):
dest.write_bytes(data)
else:
with open(dest, "wb") as f:
shutil.copyfileobj(data, f)
def download(self, path: str) -> bytes:
return self._safe_path(path).read_bytes()
def download_stream(self, path: str) -> io.BytesIO:
return io.BytesIO(self.download(path))
def delete(self, path: str) -> None:
p = self._safe_path(path)
if p.exists():
p.unlink()
def delete_prefix(self, prefix: str) -> None:
p = self._safe_path(prefix)
if p.is_dir():
shutil.rmtree(p)
elif p.exists():
p.unlink()
def exists(self, path: str) -> bool:
return self._safe_path(path).exists()
def list(self, prefix: str = "") -> list[str]:
base = self._safe_path(prefix) if prefix else self._base
if not base.exists():
return []
result = []
for p in base.rglob("*"):
if p.is_file():
result.append(str(p.relative_to(self._base)))
return result
def public_url(self, path: str) -> str | None:
return None

View File

@ -0,0 +1,32 @@
"""Interfície abstracta per a la capa de storage de fitxers."""
from typing import IO, Iterator, Protocol, runtime_checkable
@runtime_checkable
class StorageService(Protocol):
def upload(self, path: str, data: bytes | IO, content_type: str = "application/octet-stream") -> None:
...
def download(self, path: str) -> bytes:
...
def download_stream(self, path: str) -> IO:
...
def delete(self, path: str) -> None:
...
def delete_prefix(self, prefix: str) -> None:
"""Esborra tots els fitxers que comencen per prefix."""
...
def exists(self, path: str) -> bool:
...
def list(self, prefix: str = "") -> list[str]:
"""Retorna paths relatius sota el prefix."""
...
def public_url(self, path: str) -> str | None:
"""URL pública si el backend ho suporta, None si no."""
...

View File

@ -0,0 +1,61 @@
"""Tests para la StorageService."""
import io
import pytest
import tempfile
import os
from backend.app.storage.local import LocalFSStorage
@pytest.fixture
def storage(tmp_path):
return LocalFSStorage(str(tmp_path))
def test_upload_and_download_bytes(storage):
storage.upload("foo/bar.txt", b"hello world", "text/plain")
assert storage.download("foo/bar.txt") == b"hello world"
def test_upload_and_download_stream(storage):
data = io.BytesIO(b"stream data")
storage.upload("test/stream.bin", data)
result = storage.download("test/stream.bin")
assert result == b"stream data"
def test_exists(storage):
assert not storage.exists("not/there.txt")
storage.upload("yes.txt", b"x")
assert storage.exists("yes.txt")
def test_delete(storage):
storage.upload("del.txt", b"bye")
storage.delete("del.txt")
assert not storage.exists("del.txt")
def test_delete_prefix(storage):
storage.upload("dir/a.txt", b"a")
storage.upload("dir/b.txt", b"b")
storage.delete_prefix("dir")
assert not storage.exists("dir/a.txt")
assert not storage.exists("dir/b.txt")
def test_list(storage):
storage.upload("root/x.txt", b"x")
storage.upload("root/y.txt", b"y")
paths = storage.list("root")
assert len(paths) == 2
assert all("root" in p for p in paths)
def test_path_traversal_blocked(storage):
with pytest.raises(ValueError, match="Path traversal"):
storage._safe_path("../../etc/passwd")
def test_public_url_is_none(storage):
storage.upload("f.txt", b"x")
assert storage.public_url("f.txt") is None