feat(auth): AuthService — bcrypt hash, invitation and reset token CRUD

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Ubuntu 2026-05-16 09:10:56 +00:00
parent 6a31af082d
commit 84fdb4ceaa
2 changed files with 232 additions and 0 deletions

View File

@ -0,0 +1,110 @@
"""Auth service: password hashing, token CRUD."""
import uuid
import bcrypt
from datetime import datetime, timezone, timedelta
from typing import Optional
from ..db import get_session
from ..models.db_models import UserModel, InvitationTokenModel, PasswordResetTokenModel
# Timing-safe: dummy hash prevents timing attacks when user not found
_DUMMY_HASH = bcrypt.hashpw(b'dummy', bcrypt.gensalt(12)).decode('utf-8')
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(12)).decode('utf-8')
def verify_password(password: str, stored_hash: str) -> bool:
try:
return bcrypt.checkpw(password.encode('utf-8'), stored_hash.encode('utf-8'))
except Exception:
return False
def get_user_by_email(email: str) -> Optional[UserModel]:
from sqlalchemy import select
with get_session() as db:
user = db.execute(
select(UserModel).where(UserModel.email == email.strip().lower())
).scalar_one_or_none()
if user:
db.expunge(user)
return user
def create_invitation_token(user_id: str, ttl_hours: int = 48) -> str:
token = str(uuid.uuid4())
expires_at = datetime.now(timezone.utc) + timedelta(hours=ttl_hours)
with get_session() as db:
db.add(InvitationTokenModel(token=token, user_id=user_id, expires_at=expires_at))
db.commit()
return token
def get_user_by_invitation_token(token: str) -> Optional[UserModel]:
with get_session() as db:
rec = db.get(InvitationTokenModel, token)
if rec is None or rec.used_at is not None:
return None
expires = rec.expires_at
if expires.tzinfo is None:
expires = expires.replace(tzinfo=timezone.utc)
if expires < datetime.now(timezone.utc):
return None
user = db.get(UserModel, rec.user_id)
if user:
db.expunge(user)
return user
def consume_invitation_token(token: str, password: str) -> None:
"""Marca el token com usat, estableix contrasenya i activa l'usuari."""
with get_session() as db:
rec = db.get(InvitationTokenModel, token)
if rec is None:
return
rec.used_at = datetime.now(timezone.utc)
user = db.get(UserModel, rec.user_id)
if user:
user.password_hash = hash_password(password)
user.status = 'active'
db.commit()
def create_reset_token(user_id: str, ttl_hours: int = 1) -> str:
token = str(uuid.uuid4())
expires_at = datetime.now(timezone.utc) + timedelta(hours=ttl_hours)
with get_session() as db:
db.add(PasswordResetTokenModel(token=token, user_id=user_id, expires_at=expires_at))
db.commit()
return token
def get_user_by_reset_token(token: str) -> Optional[UserModel]:
with get_session() as db:
rec = db.get(PasswordResetTokenModel, token)
if rec is None or rec.used_at is not None:
return None
expires = rec.expires_at
if expires.tzinfo is None:
expires = expires.replace(tzinfo=timezone.utc)
if expires < datetime.now(timezone.utc):
return None
user = db.get(UserModel, rec.user_id)
if user:
db.expunge(user)
return user
def consume_reset_token(token: str, new_password: str) -> None:
"""Marca el token com usat i actualitza la contrasenya."""
with get_session() as db:
rec = db.get(PasswordResetTokenModel, token)
if rec is None:
return
rec.used_at = datetime.now(timezone.utc)
user = db.get(UserModel, rec.user_id)
if user:
user.password_hash = hash_password(new_password)
db.commit()

View File

@ -0,0 +1,122 @@
"""Tests unitaris per a AuthService."""
import pytest
from datetime import datetime, timezone, timedelta
@pytest.fixture(autouse=True)
def _db(in_memory_db):
pass
def test_hash_and_verify_password():
from backend.app.services.auth_service import hash_password, verify_password
h = hash_password("secret123")
assert h != "secret123"
assert verify_password("secret123", h) is True
assert verify_password("wrong", h) is False
def test_verify_wrong_hash_returns_false():
from backend.app.services.auth_service import verify_password
assert verify_password("any", "not-a-valid-hash") is False
def test_create_invitation_token(in_memory_db):
from backend.app.services.auth_service import create_invitation_token
from backend.app.models.db_models import UserModel
from backend.app.db import get_session
with get_session() as db:
user = UserModel(email="test@example.com", name="Test", role="user", status="pending")
db.add(user)
db.commit()
user_id = user.id
token = create_invitation_token(user_id, ttl_hours=1)
assert len(token) == 36 # UUID
def test_verify_valid_invitation_token(in_memory_db):
from backend.app.services.auth_service import create_invitation_token, get_user_by_invitation_token
from backend.app.models.db_models import UserModel
from backend.app.db import get_session
with get_session() as db:
user = UserModel(email="invite@example.com", name="Inv", role="user", status="pending")
db.add(user)
db.commit()
user_id = user.id
token = create_invitation_token(user_id, ttl_hours=1)
result = get_user_by_invitation_token(token)
assert result is not None
assert result.id == user_id
def test_verify_expired_invitation_token(in_memory_db):
from backend.app.services.auth_service import get_user_by_invitation_token
from backend.app.models.db_models import UserModel, InvitationTokenModel
from backend.app.db import get_session
import uuid
with get_session() as db:
user = UserModel(email="exp@example.com", name="Exp", role="user", status="pending")
db.add(user)
db.commit()
tok = InvitationTokenModel(
token=str(uuid.uuid4()),
user_id=user.id,
expires_at=datetime.now(timezone.utc) - timedelta(hours=1)
)
db.add(tok)
db.commit()
token_val = tok.token
result = get_user_by_invitation_token(token_val)
assert result is None
def test_create_and_verify_reset_token(in_memory_db):
from backend.app.services.auth_service import (
create_reset_token, get_user_by_reset_token, consume_reset_token
)
from backend.app.models.db_models import UserModel
from backend.app.db import get_session
with get_session() as db:
user = UserModel(email="reset@example.com", name="Reset", role="user", status="active",
password_hash="x")
db.add(user)
db.commit()
user_id = user.id
token = create_reset_token(user_id, ttl_hours=1)
u = get_user_by_reset_token(token)
assert u is not None
assert u.id == user_id
consume_reset_token(token, "newpassword123")
assert get_user_by_reset_token(token) is None
def test_set_password_activates_user(in_memory_db):
from backend.app.services.auth_service import (
create_invitation_token, consume_invitation_token
)
from backend.app.models.db_models import UserModel
from backend.app.db import get_session
with get_session() as db:
user = UserModel(email="act@example.com", name="Act", role="user", status="pending")
db.add(user)
db.commit()
user_id = user.id
token = create_invitation_token(user_id, ttl_hours=1)
consume_invitation_token(token, "mypassword")
with get_session() as db:
u = db.get(UserModel, user_id)
assert u.status == "active"
from backend.app.services.auth_service import verify_password
assert verify_password("mypassword", u.password_hash) is True