feat(auth): rewrite auth.py with real login, invite, forgot/reset password

Replaces demo-only login with bcrypt+JWT authentication endpoints:
login, refresh, logout, me, forgot-password, reset-password, invitation,
set-password. Adds JWTManager initialization and get_current_user helper.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Ubuntu 2026-05-16 09:14:24 +00:00
parent fc0dbddbdf
commit 949630100c
3 changed files with 314 additions and 15 deletions

View File

@ -12,6 +12,7 @@ warnings.filterwarnings("ignore", message=".*resource_tracker.*")
import jwt
from flask import Flask, request, jsonify
from flask_cors import CORS
from flask_jwt_extended import JWTManager
from .config import Config
from .utils.logger import setup_logger, get_logger
@ -37,6 +38,9 @@ def create_app(config_class=Config):
from .storage import create_storage_service
app.extensions['storage'] = create_storage_service()
# Inicialitzar JWT
JWTManager(app)
# Configure JSON encoding: ensure non-ASCII characters are output directly (not as \uXXXX)
# Flask >= 2.3 uses app.json.ensure_ascii; older versions use JSON_AS_ASCII config
if hasattr(app, 'json') and hasattr(app.json, 'ensure_ascii'):
@ -134,3 +138,25 @@ def get_storage():
"""Accés al StorageService des de qualsevol context Flask."""
from flask import current_app
return current_app.extensions['storage']
def get_current_user():
"""Retorna l'usuari autenticat via JWT Bearer token, o None si no hi ha token vàlid."""
from flask import request
from flask_jwt_extended import decode_token
from .db import get_session
from .models.db_models import UserModel
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return None
token = auth_header[len('Bearer '):]
try:
decoded = decode_token(token)
user_id = decoded.get('sub')
except Exception:
return None
with get_session() as db:
user = db.get(UserModel, user_id)
if user:
db.expunge(user)
return user

View File

@ -1,28 +1,138 @@
"""
Autenticació bàsica: POST /api/auth/login
Retorna JWT HS256 amb 24h d'expiració.
Si DEMO_PASSWORD és buida (no configurada), sempre retorna 401.
Autenticació real: login bcrypt+JWT, refresh, logout, me,
forgot-password, reset-password, invitation, set-password.
"""
import jwt
import datetime
import logging
from flask import request, jsonify, current_app
from flask_jwt_extended import (
create_access_token, create_refresh_token,
set_refresh_cookies, unset_jwt_cookies,
jwt_required, get_jwt_identity
)
from . import auth_bp
from ..db import get_session
from ..models.db_models import UserModel
from ..services.auth_service import (
verify_password, get_user_by_email, _DUMMY_HASH,
create_invitation_token, get_user_by_invitation_token, consume_invitation_token,
create_reset_token, get_user_by_reset_token, consume_reset_token
)
logger = logging.getLogger('mirofish.auth')
def _user_dto(user: UserModel) -> dict:
return {'id': user.id, 'email': user.email, 'name': user.name, 'role': user.role}
@auth_bp.route('/login', methods=['POST'])
def login():
data = request.get_json(silent=True) or {}
username = data.get('username', '')
email = data.get('email', '').strip().lower()
password = data.get('password', '')
expected = current_app.config.get('DEMO_PASSWORD', '')
if username != 'demo' or not expected or password != expected:
user = get_user_by_email(email)
candidate_hash = user.password_hash if (user and user.password_hash) else _DUMMY_HASH
valid = verify_password(password, candidate_hash)
if not valid or not user or user.status != 'active':
return jsonify({'success': False, 'error': 'Invalid credentials'}), 401
payload = {
'sub': username,
'iat': datetime.datetime.utcnow(),
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24),
}
token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
return jsonify({'success': True, 'token': token})
access_token = create_access_token(
identity=user.id,
additional_claims={'role': user.role, 'email': user.email}
)
refresh_token = create_refresh_token(identity=user.id)
response = jsonify({'success': True, 'token': access_token, 'user': _user_dto(user)})
set_refresh_cookies(response, refresh_token)
return response
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
user_id = get_jwt_identity()
with get_session() as db:
user = db.get(UserModel, user_id)
if not user or user.status != 'active':
return jsonify({'success': False, 'error': 'User not active'}), 401
db.expunge(user)
access_token = create_access_token(
identity=user.id,
additional_claims={'role': user.role, 'email': user.email}
)
return jsonify({'success': True, 'token': access_token, 'user': _user_dto(user)})
@auth_bp.route('/logout', methods=['POST'])
def logout():
response = jsonify({'success': True})
unset_jwt_cookies(response)
return response
@auth_bp.route('/me', methods=['GET'])
def me():
from .. import get_current_user
user = get_current_user()
if not user:
return jsonify({'success': False, 'error': 'Not authenticated'}), 401
return jsonify({'success': True, 'data': _user_dto(user)})
@auth_bp.route('/forgot-password', methods=['POST'])
def forgot_password():
data = request.get_json(silent=True) or {}
email = data.get('email', '').strip().lower()
user = get_user_by_email(email)
if user and user.status == 'active':
ttl = current_app.config.get('ACS_RESET_PASSWORD_TTL_HOURS', 1)
token = create_reset_token(user.id, ttl_hours=ttl)
reset_url = f"{request.host_url.rstrip('/')}/reset-password/{token}"
from ..services.email_service import send_reset_password_email
send_reset_password_email(user.email, reset_url)
return jsonify({'success': True, 'message': 'If the email exists, a reset link has been sent'}), 202
@auth_bp.route('/reset-password/<token>', methods=['GET'])
def get_reset_token(token):
user = get_user_by_reset_token(token)
if not user:
return jsonify({'success': False, 'error': 'Invalid or expired token'}), 404
return jsonify({'success': True, 'data': {'email': user.email}})
@auth_bp.route('/reset-password', methods=['POST'])
def reset_password():
data = request.get_json(silent=True) or {}
token = data.get('token', '')
password = data.get('password', '')
if len(password) < 8:
return jsonify({'success': False, 'error': 'Password must be at least 8 characters'}), 400
user = get_user_by_reset_token(token)
if not user:
return jsonify({'success': False, 'error': 'Invalid or expired token'}), 404
consume_reset_token(token, password)
return jsonify({'success': True})
@auth_bp.route('/invitation/<token>', methods=['GET'])
def get_invitation(token):
user = get_user_by_invitation_token(token)
if not user:
return jsonify({'success': False, 'error': 'Invalid or expired token'}), 404
return jsonify({'success': True, 'data': {'email': user.email, 'name': user.name}})
@auth_bp.route('/set-password', methods=['POST'])
def set_password():
data = request.get_json(silent=True) or {}
token = data.get('token', '')
password = data.get('password', '')
if len(password) < 8:
return jsonify({'success': False, 'error': 'Password must be at least 8 characters'}), 400
user = get_user_by_invitation_token(token)
if not user:
return jsonify({'success': False, 'error': 'Invalid or expired token'}), 404
consume_invitation_token(token, password)
return jsonify({'success': True})

View File

@ -0,0 +1,163 @@
"""Tests d'integració per als endpoints d'autenticació."""
import pytest
from unittest.mock import MagicMock, patch
@pytest.fixture
def app(in_memory_db):
import backend.app.db as db_module
saved_engine = db_module._engine
saved_session = db_module._SessionLocal
def _noop_init_db(url):
db_module._engine = saved_engine
db_module._SessionLocal = saved_session
with patch('backend.app.db.init_db', side_effect=_noop_init_db):
from backend.app import create_app
application = create_app()
application.config['TESTING'] = True
application.extensions['storage'] = MagicMock()
db_module._engine = saved_engine
db_module._SessionLocal = saved_session
return application
@pytest.fixture
def client(app):
with app.test_client() as c:
yield c
@pytest.fixture
def active_user(in_memory_db):
from backend.app.models.db_models import UserModel
from backend.app.services.auth_service import hash_password
from backend.app.db import get_session
with get_session() as db:
user = UserModel(
email="user@example.com",
name="Test User",
role="user",
status="active",
password_hash=hash_password("password123")
)
db.add(user)
db.commit()
db.refresh(user)
user_id = user.id
return user_id
def test_login_success(client, active_user):
res = client.post('/api/auth/login', json={
'email': 'user@example.com',
'password': 'password123'
})
assert res.status_code == 200
data = res.get_json()
assert data['success'] is True
assert 'token' in data
assert data['user']['email'] == 'user@example.com'
def test_login_wrong_password(client, active_user):
res = client.post('/api/auth/login', json={
'email': 'user@example.com',
'password': 'wrongpassword'
})
assert res.status_code == 401
assert res.get_json()['success'] is False
def test_login_nonexistent_user(client, in_memory_db):
res = client.post('/api/auth/login', json={
'email': 'nobody@example.com',
'password': 'password123'
})
assert res.status_code == 401
def test_login_pending_user_rejected(client, in_memory_db):
from backend.app.models.db_models import UserModel
from backend.app.services.auth_service import hash_password
from backend.app.db import get_session
with get_session() as db:
user = UserModel(
email="pending@example.com", name="P", role="user", status="pending",
password_hash=hash_password("pass")
)
db.add(user)
db.commit()
res = client.post('/api/auth/login', json={'email': 'pending@example.com', 'password': 'pass'})
assert res.status_code == 401
def test_forgot_password_always_202(client, in_memory_db):
res = client.post('/api/auth/forgot-password', json={'email': 'notexists@example.com'})
assert res.status_code == 202
res2 = client.post('/api/auth/forgot-password', json={'email': 'alsonotexists@example.com'})
assert res2.status_code == 202
def test_get_invitation_token_valid(client, in_memory_db):
from backend.app.models.db_models import UserModel
from backend.app.services.auth_service import create_invitation_token
from backend.app.db import get_session
with get_session() as db:
user = UserModel(email="inv@example.com", name="Inv", role="user", status="pending")
db.add(user)
db.commit()
user_id = user.id
token = create_invitation_token(user_id, ttl_hours=24)
res = client.get(f'/api/auth/invitation/{token}')
assert res.status_code == 200
assert res.get_json()['data']['email'] == 'inv@example.com'
def test_get_invitation_token_invalid(client, in_memory_db):
res = client.get('/api/auth/invitation/non-existent-token')
assert res.status_code == 404
def test_set_password_activates_user(client, in_memory_db):
from backend.app.models.db_models import UserModel
from backend.app.services.auth_service import create_invitation_token
from backend.app.db import get_session
with get_session() as db:
user = UserModel(email="setpwd@example.com", name="S", role="user", status="pending")
db.add(user)
db.commit()
user_id = user.id
token = create_invitation_token(user_id, ttl_hours=24)
res = client.post('/api/auth/set-password', json={'token': token, 'password': 'newpass123'})
assert res.status_code == 200
with get_session() as db:
u = db.get(UserModel, user_id)
assert u.status == 'active'
def test_reset_password_flow(client, in_memory_db):
from backend.app.models.db_models import UserModel
from backend.app.services.auth_service import hash_password, create_reset_token
from backend.app.db import get_session
with get_session() as db:
user = UserModel(
email="reset@example.com", name="R", role="user", status="active",
password_hash=hash_password("oldpass")
)
db.add(user)
db.commit()
user_id = user.id
token = create_reset_token(user_id, ttl_hours=1)
res = client.get(f'/api/auth/reset-password/{token}')
assert res.status_code == 200
assert res.get_json()['data']['email'] == 'reset@example.com'
res2 = client.post('/api/auth/reset-password', json={'token': token, 'password': 'newpass456'})
assert res2.status_code == 200
res3 = client.get(f'/api/auth/reset-password/{token}')
assert res3.status_code == 404