diff --git a/backend/app/services/auth_service.py b/backend/app/services/auth_service.py new file mode 100644 index 00000000..1f0fe5b5 --- /dev/null +++ b/backend/app/services/auth_service.py @@ -0,0 +1,110 @@ +"""Auth service: password hashing, token CRUD.""" +import uuid +import bcrypt +from datetime import datetime, timezone, timedelta +from typing import Optional + +from ..db import get_session +from ..models.db_models import UserModel, InvitationTokenModel, PasswordResetTokenModel + +# Timing-safe: dummy hash prevents timing attacks when user not found +_DUMMY_HASH = bcrypt.hashpw(b'dummy', bcrypt.gensalt(12)).decode('utf-8') + + +def hash_password(password: str) -> str: + return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(12)).decode('utf-8') + + +def verify_password(password: str, stored_hash: str) -> bool: + try: + return bcrypt.checkpw(password.encode('utf-8'), stored_hash.encode('utf-8')) + except Exception: + return False + + +def get_user_by_email(email: str) -> Optional[UserModel]: + from sqlalchemy import select + with get_session() as db: + user = db.execute( + select(UserModel).where(UserModel.email == email.strip().lower()) + ).scalar_one_or_none() + if user: + db.expunge(user) + return user + + +def create_invitation_token(user_id: str, ttl_hours: int = 48) -> str: + token = str(uuid.uuid4()) + expires_at = datetime.now(timezone.utc) + timedelta(hours=ttl_hours) + with get_session() as db: + db.add(InvitationTokenModel(token=token, user_id=user_id, expires_at=expires_at)) + db.commit() + return token + + +def get_user_by_invitation_token(token: str) -> Optional[UserModel]: + with get_session() as db: + rec = db.get(InvitationTokenModel, token) + if rec is None or rec.used_at is not None: + return None + expires = rec.expires_at + if expires.tzinfo is None: + expires = expires.replace(tzinfo=timezone.utc) + if expires < datetime.now(timezone.utc): + return None + user = db.get(UserModel, rec.user_id) + if user: + db.expunge(user) + return user + + +def consume_invitation_token(token: str, password: str) -> None: + """Marca el token com usat, estableix contrasenya i activa l'usuari.""" + with get_session() as db: + rec = db.get(InvitationTokenModel, token) + if rec is None: + return + rec.used_at = datetime.now(timezone.utc) + user = db.get(UserModel, rec.user_id) + if user: + user.password_hash = hash_password(password) + user.status = 'active' + db.commit() + + +def create_reset_token(user_id: str, ttl_hours: int = 1) -> str: + token = str(uuid.uuid4()) + expires_at = datetime.now(timezone.utc) + timedelta(hours=ttl_hours) + with get_session() as db: + db.add(PasswordResetTokenModel(token=token, user_id=user_id, expires_at=expires_at)) + db.commit() + return token + + +def get_user_by_reset_token(token: str) -> Optional[UserModel]: + with get_session() as db: + rec = db.get(PasswordResetTokenModel, token) + if rec is None or rec.used_at is not None: + return None + expires = rec.expires_at + if expires.tzinfo is None: + expires = expires.replace(tzinfo=timezone.utc) + if expires < datetime.now(timezone.utc): + return None + user = db.get(UserModel, rec.user_id) + if user: + db.expunge(user) + return user + + +def consume_reset_token(token: str, new_password: str) -> None: + """Marca el token com usat i actualitza la contrasenya.""" + with get_session() as db: + rec = db.get(PasswordResetTokenModel, token) + if rec is None: + return + rec.used_at = datetime.now(timezone.utc) + user = db.get(UserModel, rec.user_id) + if user: + user.password_hash = hash_password(new_password) + db.commit() diff --git a/backend/tests/test_auth_service.py b/backend/tests/test_auth_service.py new file mode 100644 index 00000000..c0e078b5 --- /dev/null +++ b/backend/tests/test_auth_service.py @@ -0,0 +1,122 @@ +"""Tests unitaris per a AuthService.""" +import pytest +from datetime import datetime, timezone, timedelta + + +@pytest.fixture(autouse=True) +def _db(in_memory_db): + pass + + +def test_hash_and_verify_password(): + from backend.app.services.auth_service import hash_password, verify_password + h = hash_password("secret123") + assert h != "secret123" + assert verify_password("secret123", h) is True + assert verify_password("wrong", h) is False + + +def test_verify_wrong_hash_returns_false(): + from backend.app.services.auth_service import verify_password + assert verify_password("any", "not-a-valid-hash") is False + + +def test_create_invitation_token(in_memory_db): + from backend.app.services.auth_service import create_invitation_token + from backend.app.models.db_models import UserModel + from backend.app.db import get_session + + with get_session() as db: + user = UserModel(email="test@example.com", name="Test", role="user", status="pending") + db.add(user) + db.commit() + user_id = user.id + + token = create_invitation_token(user_id, ttl_hours=1) + assert len(token) == 36 # UUID + + +def test_verify_valid_invitation_token(in_memory_db): + from backend.app.services.auth_service import create_invitation_token, get_user_by_invitation_token + from backend.app.models.db_models import UserModel + from backend.app.db import get_session + + with get_session() as db: + user = UserModel(email="invite@example.com", name="Inv", role="user", status="pending") + db.add(user) + db.commit() + user_id = user.id + + token = create_invitation_token(user_id, ttl_hours=1) + result = get_user_by_invitation_token(token) + assert result is not None + assert result.id == user_id + + +def test_verify_expired_invitation_token(in_memory_db): + from backend.app.services.auth_service import get_user_by_invitation_token + from backend.app.models.db_models import UserModel, InvitationTokenModel + from backend.app.db import get_session + import uuid + + with get_session() as db: + user = UserModel(email="exp@example.com", name="Exp", role="user", status="pending") + db.add(user) + db.commit() + tok = InvitationTokenModel( + token=str(uuid.uuid4()), + user_id=user.id, + expires_at=datetime.now(timezone.utc) - timedelta(hours=1) + ) + db.add(tok) + db.commit() + token_val = tok.token + + result = get_user_by_invitation_token(token_val) + assert result is None + + +def test_create_and_verify_reset_token(in_memory_db): + from backend.app.services.auth_service import ( + create_reset_token, get_user_by_reset_token, consume_reset_token + ) + from backend.app.models.db_models import UserModel + from backend.app.db import get_session + + with get_session() as db: + user = UserModel(email="reset@example.com", name="Reset", role="user", status="active", + password_hash="x") + db.add(user) + db.commit() + user_id = user.id + + token = create_reset_token(user_id, ttl_hours=1) + u = get_user_by_reset_token(token) + assert u is not None + assert u.id == user_id + + consume_reset_token(token, "newpassword123") + assert get_user_by_reset_token(token) is None + + +def test_set_password_activates_user(in_memory_db): + from backend.app.services.auth_service import ( + create_invitation_token, consume_invitation_token + ) + from backend.app.models.db_models import UserModel + from backend.app.db import get_session + + with get_session() as db: + user = UserModel(email="act@example.com", name="Act", role="user", status="pending") + db.add(user) + db.commit() + user_id = user.id + + token = create_invitation_token(user_id, ttl_hours=1) + consume_invitation_token(token, "mypassword") + + with get_session() as db: + u = db.get(UserModel, user_id) + assert u.status == "active" + from backend.app.services.auth_service import verify_password + assert verify_password("mypassword", u.password_hash) is True