From 934d8fc35f9c69f6fe758e40db690bc907957b4c Mon Sep 17 00:00:00 2001 From: LinMihigo Date: Wed, 5 Nov 2025 22:29:28 +0200 Subject: [PATCH] Chore: moving changes - migrating Desktop from nobara 42 to windows(WSL) --- backend/BACKEND.md | 36 +- .../4c0d2503877e_add_user_approval_system.py | 40 ++ backend/app/api/v1/users.py | 155 ++++- backend/app/core/auth.py | 40 +- backend/app/schemas/models.py | 5 + backend/app/schemas/schemas.py | 18 + backend/tests/api/v1/test_users.py | 586 +++++++++++++++++- backend/tests/conftest.py | 9 +- 8 files changed, 826 insertions(+), 63 deletions(-) create mode 100644 backend/app/alembic/versions/4c0d2503877e_add_user_approval_system.py diff --git a/backend/BACKEND.md b/backend/BACKEND.md index 9cd1df8..1cc6c37 100644 --- a/backend/BACKEND.md +++ b/backend/BACKEND.md @@ -32,6 +32,8 @@ python scripts/create_admin.py ### Run the Application ```bash uvicorn app.main:app --reload +# or +fastapi run --reload app/main.py # API will be available at http://localhost:8000 # Docs at http://localhost:8000/docs ``` @@ -63,20 +65,46 @@ uvicorn app.main:app --reload ## Authentication Usage -### Login Example +### Getting a Bearer Token + +First, you need to create an admin user (if you haven't already): +```bash +cd backend +python scripts/create_admin.py +``` + +Then login to get your bearer token: ```bash curl -X POST "http://localhost:8000/api/v1/users/login" \ -H "Content-Type: application/json" \ - -d '{"username": "admin", "password": "password"}' + -d '{"username": "your_admin_username", "password": "your_password"}' ``` -### Using Token +**Response:** +```json +{ + "access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...", + "token_type": "bearer", + "expires_in": 28800, + "user": { + "id": 1, + "username": "admin", + "role": "admin" + } +} +``` + +Copy the `access_token` value - this is your bearer token. + +### Using the Bearer Token ```bash # Include token in Authorization header curl -X GET "http://localhost:8000/api/v1/users/me" \ - -H "Authorization: Bearer YOUR_JWT_TOKEN" + -H "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..." ``` +**Note:** Replace `eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...` with your actual token from the login response. + ### Create User (Admin only) ```bash curl -X POST "http://localhost:8000/api/v1/users/" \ diff --git a/backend/app/alembic/versions/4c0d2503877e_add_user_approval_system.py b/backend/app/alembic/versions/4c0d2503877e_add_user_approval_system.py new file mode 100644 index 0000000..c863cd9 --- /dev/null +++ b/backend/app/alembic/versions/4c0d2503877e_add_user_approval_system.py @@ -0,0 +1,40 @@ +"""add_user_approval_system + +Revision ID: 4c0d2503877e +Revises: 997376dc1774 +Create Date: 2025-09-28 11:55:11.997364 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '4c0d2503877e' +down_revision: Union[str, Sequence[str], None] = '997376dc1774' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + # Add column as nullable first + op.add_column('user', sa.Column('is_approved', sa.Boolean(), nullable=True)) + + # Set default value for existing users - approve all existing users by default + # (they were created before the approval system, so they should be grandfathered in) + op.execute("UPDATE \"user\" SET is_approved = true") + + # Make column not nullable + op.alter_column('user', 'is_approved', nullable=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('user', 'is_approved') + # ### end Alembic commands ### diff --git a/backend/app/api/v1/users.py b/backend/app/api/v1/users.py index e1c1076..4c4c1a9 100644 --- a/backend/app/api/v1/users.py +++ b/backend/app/api/v1/users.py @@ -1,6 +1,7 @@ # backend/app/api/v1/users.py from datetime import timedelta -from fastapi import APIRouter, Depends, HTTPException, status +from typing import Optional +from fastapi import APIRouter, Depends, HTTPException, status, Request from fastapi.security import HTTPBearer from sqlmodel import Session, select from app.core.db import get_session @@ -12,7 +13,9 @@ from app.core.auth import ( get_token_expiration_minutes, require_admin, require_write_access, - require_any_access + require_any_access, + verify_password, + send_password_reset_email ) from app.schemas.models import User from app.schemas.schemas import ( @@ -20,7 +23,10 @@ from app.schemas.schemas import ( UserUpdate, UserLogin, Token, - UserResponse + UserResponse, + UserApprovalUpdate, + PasswordChangeRequest, + EmailVerificationRequest ) router = APIRouter(prefix="/users", tags=["users"]) @@ -29,11 +35,23 @@ router = APIRouter(prefix="/users", tags=["users"]) @router.post("/login", response_model=Token) def login(user_credentials: UserLogin, session: Session = Depends(get_session)): """Authenticate user and return JWT token with role-based expiration.""" - user = authenticate_user(session, user_credentials.username, user_credentials.password) + user, error_message = authenticate_user(session, user_credentials.username, user_credentials.password) + if not user: + error_details = { + "user_not_found": "Username not found", + "invalid_password": "Incorrect password", + "account_pending_approval": "Account pending admin approval. Please contact an administrator." + } + + # Use different status codes for different error types + status_code = status.HTTP_401_UNAUTHORIZED + if error_message == "account_pending_approval": + status_code = status.HTTP_403_FORBIDDEN + raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password", + status_code=status_code, + detail=error_details.get(error_message, "Authentication failed"), headers={"WWW-Authenticate": "Bearer"}, ) @@ -58,7 +76,8 @@ def login(user_credentials: UserLogin, session: Session = Depends(get_session)): user=UserResponse( id=user.id, username=user.username, - role=user.role + role=user.role, + is_approved=user.is_approved ) ) @@ -69,22 +88,23 @@ def get_current_user_info(current_user: User = Depends(get_current_active_user)) return UserResponse( id=current_user.id, username=current_user.username, - role=current_user.role + role=current_user.role, + is_approved=current_user.is_approved ) @router.get("/", response_model=list[UserResponse]) def get_all_users( session: Session = Depends(get_session), - current_user: User = Depends(require_any_access), + current_user: User = Depends(require_admin), skip: int = 0, limit: int = 100 ): - """Get all users (requires any authenticated role).""" + """Get all users (requires admin authenticated role).""" statement = select(User).offset(skip).limit(limit) users = session.exec(statement).all() return [ - UserResponse(id=user.id, username=user.username, role=user.role) + UserResponse(id=user.id, username=user.username, role=user.role, is_approved=user.is_approved) for user in users ] @@ -92,10 +112,9 @@ def get_all_users( @router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user( user: UserCreate, - session: Session = Depends(get_session), - current_user: User = Depends(require_admin) + session: Session = Depends(get_session) ): - """Create a new user (admin only).""" + """Create a new user (public registration - requires admin approval to login).""" # Check if username already exists statement = select(User).where(User.username == user.username) existing_user = session.exec(statement).first() @@ -105,12 +124,13 @@ def create_user( detail="Username already registered" ) - # Create new user with hashed password + # Create new user with hashed password (not approved by default) hashed_password = get_password_hash(user.password) db_user = User( username=user.username, password_hash=hashed_password, - role=user.role + role=user.role, + is_approved=False # Requires admin approval ) session.add(db_user) @@ -120,7 +140,8 @@ def create_user( return UserResponse( id=db_user.id, username=db_user.username, - role=db_user.role + role=db_user.role, + is_approved=db_user.is_approved ) @@ -141,7 +162,8 @@ def get_user( return UserResponse( id=user.id, username=user.username, - role=user.role + role=user.role, + is_approved=user.is_approved ) @@ -161,8 +183,26 @@ def update_user( detail="User not found" ) - # Update only provided fields + # Get update data update_data = user_update.model_dump(exclude_unset=True) + + # Check for duplicate username if username is being updated + if "username" in update_data and update_data["username"] != user.username: + statement = select(User).where(User.username == update_data["username"]) + existing_user = session.exec(statement).first() + if existing_user: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Username already exists" + ) + + # Handle password hashing if password is being updated + if "password" in update_data: + hashed_password = get_password_hash(update_data["password"]) + update_data["password_hash"] = hashed_password + del update_data["password"] # Remove plain text password + + # Update only provided fields for key, value in update_data.items(): setattr(user, key, value) @@ -173,7 +213,38 @@ def update_user( return UserResponse( id=user.id, username=user.username, - role=user.role + role=user.role, + is_approved=user.is_approved + ) + + +@router.put("/{user_id}/approval", response_model=UserResponse) +def update_user_approval( + user_id: int, + approval_update: UserApprovalUpdate, + session: Session = Depends(get_session), + current_user: User = Depends(require_admin) +): + """Approve or reject a user account (admin only).""" + user = session.get(User, user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="User not found" + ) + + # Update approval status + user.is_approved = approval_update.is_approved + + session.add(user) + session.commit() + session.refresh(user) + + return UserResponse( + id=user.id, + username=user.username, + role=user.role, + is_approved=user.is_approved ) @@ -201,3 +272,47 @@ def delete_user( session.delete(user) session.commit() return None + + +@router.put("/me/change-password") +def change_password( + password_change: PasswordChangeRequest, + session: Session = Depends(get_session), + current_user: User = Depends(get_current_active_user) +): + """Change password (user must know current password).""" + # Verify current password + if not verify_password(password_change.current_password, current_user.password_hash): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Current password is incorrect" + ) + + # Update to new password + hashed_password = get_password_hash(password_change.new_password) + current_user.password_hash = hashed_password + + session.add(current_user) + session.commit() + + return {"message": "Password changed successfully"} + + +@router.post("/request-password-reset") +def request_password_reset( + reset_request: EmailVerificationRequest, + session: Session = Depends(get_session) +): + """Request password reset via email verification (no database needed).""" + # Find user by username + statement = select(User).where(User.username == reset_request.username) + user = session.exec(statement).first() + + # Always return success to prevent username enumeration + if user and user.is_approved: + # Send email with instructions (mock implementation) + send_password_reset_email(reset_request.username, reset_request.email) + + return { + "message": "If your username and email are correct, you will receive instructions to reset your password." + } diff --git a/backend/app/core/auth.py b/backend/app/core/auth.py index 1ec025c..e207669 100644 --- a/backend/app/core/auth.py +++ b/backend/app/core/auth.py @@ -3,9 +3,11 @@ Authentication utilities for JWT-based session management with role-based expira """ from datetime import datetime, timedelta, timezone from typing import Optional, Union +import secrets +import hashlib from jose import JWTError, jwt from passlib.context import CryptContext -from fastapi import Depends, HTTPException, status +from fastapi import Depends, HTTPException, status, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlmodel import Session, select from app.core.config import settings @@ -35,15 +37,27 @@ def authenticate_user( session: Session, username: str, password: str - ) -> Optional[User]: - """Authenticate user with username and password.""" + ) -> tuple[Optional[User], str]: + """Authenticate user with username and password. + + Returns: + tuple: (User object or None, error_message) + error_message values: + - "success" if authentication successful + - "user_not_found" if username doesn't exist + - "invalid_password" if password is incorrect + - "account_pending_approval" if user exists but not approved + """ statement = select(User).where(User.username == username) user = session.exec(statement).first() if not user: - return None + return None, "user_not_found" if not verify_password(password, user.password_hash): - return None - return user + return None, "invalid_password" + # Check if user is approved + if not user.is_approved: + return None, "account_pending_approval" + return user, "success" def get_token_expiration_minutes(role: UserRole) -> int: @@ -138,3 +152,17 @@ def require_role(required_roles: list[UserRole]): require_admin = require_role([UserRole.ADMIN]) require_write_access = require_role([UserRole.ADMIN, UserRole.WRITE]) require_any_access = require_role([UserRole.ADMIN, UserRole.WRITE, UserRole.READ_ONLY]) + + +def send_password_reset_email(username: str, email: str) -> bool: + """Send password reset instructions via email (mock implementation).""" + # In a real application, you would: + # 1. Verify the email belongs to the username + # 2. Send an email with instructions to reset password + # 3. The email would contain a link to your frontend with instructions + + print(f"Mock: Sending password reset email to {email} for user {username}") + print("Instructions: Please contact your system administrator to reset your password.") + + # Return True to indicate email was "sent" + return True diff --git a/backend/app/schemas/models.py b/backend/app/schemas/models.py index 81028d1..e58fd92 100644 --- a/backend/app/schemas/models.py +++ b/backend/app/schemas/models.py @@ -29,12 +29,14 @@ class User(SQLModel, table=True): username (str): Unique user name (max 100 chars). role (UserRole): User role (default READ_ONLY). password_hash (str): Hashed password. + is_approved (bool): Whether user is approved by admin (default False). """ id: Optional[int] = Field(default=None, primary_key=True) username: str = Field(nullable=False,unique=True, max_length=100) role: UserRole = Field(nullable=False, max_length= 10, default=UserRole.READ_ONLY) password_hash: str = Field(nullable=False) + is_approved: bool = Field(nullable=False, default=False) class Partner(SQLModel, table=True): @@ -267,3 +269,6 @@ class Inventory(SQLModel, table=True): product_id: int = Field(nullable=False, unique=True, foreign_key="product.id") total_qty: int = Field(nullable=False, default=0) + + + diff --git a/backend/app/schemas/schemas.py b/backend/app/schemas/schemas.py index 011d304..5854c2d 100644 --- a/backend/app/schemas/schemas.py +++ b/backend/app/schemas/schemas.py @@ -31,6 +31,7 @@ class UserResponse(SQLModel): id: Optional[int] = None username: str role: UserRole + is_approved: bool class Token(SQLModel): @@ -46,6 +47,23 @@ class TokenData(SQLModel): role: Optional[UserRole] = None +class UserApprovalUpdate(SQLModel): + """Schema for admin to approve/reject user accounts.""" + is_approved: bool + + +class PasswordChangeRequest(SQLModel): + """Schema for changing password (user knows current password).""" + current_password: str + new_password: str + + +class EmailVerificationRequest(SQLModel): + """Schema for requesting email verification for password reset.""" + username: str + email: str # User provides their email for verification + + ################################################## # Transactions class TransactionBase(SQLModel): diff --git a/backend/tests/api/v1/test_users.py b/backend/tests/api/v1/test_users.py index 576f6aa..71086e0 100644 --- a/backend/tests/api/v1/test_users.py +++ b/backend/tests/api/v1/test_users.py @@ -2,62 +2,84 @@ import pytest from fastapi.testclient import TestClient -def test_create_user(client: TestClient, admin_token: str): - """Test user creation with admin authentication.""" +def test_create_user_public_registration(client: TestClient): + """Test public user registration (no authentication required).""" user_data = { "username": "testuser", "password": "testpassword", "role": "read_only" } - response = client.post("/api/v1/users/", - json=user_data, - headers={"Authorization": f"Bearer {admin_token}"}) + response = client.post("/api/v1/users/", json=user_data) assert response.status_code == 201 data = response.json() assert data["username"] == "testuser" assert data["role"] == "read_only" + assert data["is_approved"] == False # Should not be approved by default assert "id" in data -def test_create_user_unauthorized(client: TestClient): - """Test user creation without authentication should fail.""" +def test_unapproved_user_cannot_login(client: TestClient): + """Test that unapproved users cannot login.""" + # Create user (should be unapproved by default) user_data = { - "username": "testuser2", + "username": "unapproveduser", "password": "testpassword", "role": "read_only" } response = client.post("/api/v1/users/", json=user_data) - # HTTPBearer returns 403 when no Authorization header is provided + assert response.status_code == 201 + assert response.json()["is_approved"] == False + + # Try to login - should fail with specific error + login_data = { + "username": "unapproveduser", + "password": "testpassword" + } + response = client.post("/api/v1/users/login", json=login_data) assert response.status_code == 403 + assert "pending admin approval" in response.json()["detail"].lower() -def test_create_user_invalid_token(client: TestClient): - """Test user creation with invalid token should fail.""" +def test_admin_can_approve_users(client: TestClient, admin_token: str): + """Test that admin can approve user accounts.""" + # Create user user_data = { - "username": "testuser3", + "username": "toapprove", "password": "testpassword", "role": "read_only" } - response = client.post("/api/v1/users/", - json=user_data, - headers={"Authorization": "Bearer invalid_token"}) - # Invalid token should return 401 - assert response.status_code == 401 + create_response = client.post("/api/v1/users/", json=user_data) + user_id = create_response.json()["id"] + + # Admin approves the user + approval_data = {"is_approved": True} + response = client.put(f"/api/v1/users/{user_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {admin_token}"}) + assert response.status_code == 200 + data = response.json() + assert data["is_approved"] == True + assert data["username"] == "toapprove" -def test_login_user(client: TestClient, admin_token: str): - """Test user login.""" - # First create a user using admin token +def test_approved_user_can_login(client: TestClient, admin_token: str): + """Test that approved users can login successfully.""" + # Create user user_data = { "username": "logintest", "password": "testpassword", "role": "read_only" } - client.post("/api/v1/users/", - json=user_data, - headers={"Authorization": f"Bearer {admin_token}"}) + create_response = client.post("/api/v1/users/", json=user_data) + user_id = create_response.json()["id"] - # Then try to login + # Admin approves the user + approval_data = {"is_approved": True} + client.put(f"/api/v1/users/{user_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {admin_token}"}) + + # Now user can login login_data = { "username": "logintest", "password": "testpassword" @@ -68,20 +90,27 @@ def test_login_user(client: TestClient, admin_token: str): assert "access_token" in data assert data["token_type"] == "bearer" assert "expires_in" in data + assert data["user"]["is_approved"] == True def test_get_current_user(client: TestClient, admin_token: str): """Test getting current user info.""" - # Create and login user + # Create user user_data = { "username": "currenttest", "password": "testpassword", - "role": "admin" + "role": "write" } - client.post("/api/v1/users/", - json=user_data, - headers={"Authorization": f"Bearer {admin_token}"}) + create_response = client.post("/api/v1/users/", json=user_data) + user_id = create_response.json()["id"] + # Admin approves the user + approval_data = {"is_approved": True} + client.put(f"/api/v1/users/{user_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {admin_token}"}) + + # Login user login_response = client.post("/api/v1/users/login", json={ "username": "currenttest", "password": "testpassword" @@ -95,4 +124,501 @@ def test_get_current_user(client: TestClient, admin_token: str): assert response.status_code == 200 data = response.json() assert data["username"] == "currenttest" - assert data["role"] == "admin" + assert data["role"] == "write" + assert data["is_approved"] == True + + +def test_admin_can_reject_users(client: TestClient, admin_token: str): + """Test that admin can reject/unapprove user accounts.""" + # Create user + user_data = { + "username": "toreject", + "password": "testpassword", + "role": "read_only" + } + create_response = client.post("/api/v1/users/", json=user_data) + user_id = create_response.json()["id"] + + # Admin first approves, then rejects the user + approval_data = {"is_approved": True} + client.put(f"/api/v1/users/{user_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {admin_token}"}) + + # Now reject/unapprove + rejection_data = {"is_approved": False} + response = client.put(f"/api/v1/users/{user_id}/approval", + json=rejection_data, + headers={"Authorization": f"Bearer {admin_token}"}) + assert response.status_code == 200 + assert response.json()["is_approved"] == False + + +def test_non_admin_cannot_approve_users(client: TestClient, admin_token: str): + """Test that non-admin users cannot approve other users.""" + # Create two users + user1_data = { + "username": "user1", + "password": "testpassword", + "role": "write" + } + user2_data = { + "username": "user2", + "password": "testpassword", + "role": "read_only" + } + + create_response1 = client.post("/api/v1/users/", json=user1_data) + user1_id = create_response1.json()["id"] + + create_response2 = client.post("/api/v1/users/", json=user2_data) + user2_id = create_response2.json()["id"] + + # Admin approves user1 so they can login + approval_data = {"is_approved": True} + client.put(f"/api/v1/users/{user1_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {admin_token}"}) + + # User1 logs in + login_response = client.post("/api/v1/users/login", json={ + "username": "user1", + "password": "testpassword" + }) + user1_token = login_response.json()["access_token"] + + # User1 tries to approve user2 - should fail + response = client.put(f"/api/v1/users/{user2_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {user1_token}"}) + assert response.status_code == 403 + + +def test_login_error_messages(client: TestClient): + """Test specific login error messages.""" + # Test non-existent user + response = client.post("/api/v1/users/login", json={ + "username": "nonexistent", + "password": "password" + }) + assert response.status_code == 401 + assert "Username not found" in response.json()["detail"] + + # Create user for testing wrong password + user_data = { + "username": "wrongpasstest", + "password": "correctpassword", + "role": "read_only" + } + client.post("/api/v1/users/", json=user_data) + + # Test wrong password + response = client.post("/api/v1/users/login", json={ + "username": "wrongpasstest", + "password": "wrongpassword" + }) + assert response.status_code == 401 + assert "Incorrect password" in response.json()["detail"] + + +def test_duplicate_username_registration(client: TestClient): + """Test that duplicate usernames are not allowed.""" + user_data = { + "username": "duplicate", + "password": "password1", + "role": "read_only" + } + + # First registration should succeed + response1 = client.post("/api/v1/users/", json=user_data) + assert response1.status_code == 201 + + # Second registration with same username should fail + user_data["password"] = "password2" # Different password, same username + response2 = client.post("/api/v1/users/", json=user_data) + assert response2.status_code == 400 + assert "Username already registered" in response2.json()["detail"] + + +def test_admin_can_delete_users(client: TestClient, admin_token: str): + """Test that admin can delete user accounts.""" + # Create user to delete + user_data = { + "username": "todelete", + "password": "testpassword", + "role": "read_only" + } + create_response = client.post("/api/v1/users/", json=user_data) + user_id = create_response.json()["id"] + + # Admin deletes the user + response = client.delete(f"/api/v1/users/{user_id}", + headers={"Authorization": f"Bearer {admin_token}"}) + assert response.status_code == 204 + + # Verify user is deleted - try to get user should return 404 + get_response = client.get(f"/api/v1/users/{user_id}", + headers={"Authorization": f"Bearer {admin_token}"}) + assert get_response.status_code == 404 + + +def test_admin_cannot_delete_self(client: TestClient, admin_token: str): + """Test that admin cannot delete their own account.""" + # Get admin user info + me_response = client.get("/api/v1/users/me", + headers={"Authorization": f"Bearer {admin_token}"}) + admin_user_id = me_response.json()["id"] + + # Try to delete self - should fail + response = client.delete(f"/api/v1/users/{admin_user_id}", + headers={"Authorization": f"Bearer {admin_token}"}) + assert response.status_code == 400 + assert "Cannot delete your own account" in response.json()["detail"] + + +def test_non_admin_cannot_delete_users(client: TestClient, admin_token: str): + """Test that non-admin users cannot delete other users.""" + # Create two users + user1_data = { + "username": "user1delete", + "password": "testpassword", + "role": "write" + } + user2_data = { + "username": "user2delete", + "password": "testpassword", + "role": "read_only" + } + + create_response1 = client.post("/api/v1/users/", json=user1_data) + user1_id = create_response1.json()["id"] + + create_response2 = client.post("/api/v1/users/", json=user2_data) + user2_id = create_response2.json()["id"] + + # Admin approves user1 + approval_data = {"is_approved": True} + client.put(f"/api/v1/users/{user1_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {admin_token}"}) + + # User1 logs in + login_response = client.post("/api/v1/users/login", json={ + "username": "user1delete", + "password": "testpassword" + }) + user1_token = login_response.json()["access_token"] + + # User1 tries to delete user2 - should fail + response = client.delete(f"/api/v1/users/{user2_id}", + headers={"Authorization": f"Bearer {user1_token}"}) + assert response.status_code == 403 + + +def test_delete_nonexistent_user(client: TestClient, admin_token: str): + """Test deleting a user that doesn't exist.""" + response = client.delete("/api/v1/users/99999", + headers={"Authorization": f"Bearer {admin_token}"}) + assert response.status_code == 404 + assert "User not found" in response.json()["detail"] + + +def test_admin_can_update_user_details(client: TestClient, admin_token: str): + """Test that admin can update user details.""" + # Create user to update + user_data = { + "username": "toupdate", + "password": "originalpassword", + "role": "read_only" + } + create_response = client.post("/api/v1/users/", json=user_data) + user_id = create_response.json()["id"] + + # Admin updates the user + update_data = { + "username": "updated_username", + "role": "write" + } + response = client.put(f"/api/v1/users/{user_id}", + json=update_data, + headers={"Authorization": f"Bearer {admin_token}"}) + assert response.status_code == 200 + data = response.json() + assert data["username"] == "updated_username" + assert data["role"] == "write" + assert data["is_approved"] == False # Should remain unchanged + + +def test_admin_can_update_user_password(client: TestClient, admin_token: str): + """Test that admin can update user password.""" + # Create and approve user + user_data = { + "username": "passwordupdate", + "password": "oldpassword", + "role": "read_only" + } + create_response = client.post("/api/v1/users/", json=user_data) + user_id = create_response.json()["id"] + + # Approve user first + approval_data = {"is_approved": True} + client.put(f"/api/v1/users/{user_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {admin_token}"}) + + # Verify login works with old password + login_response = client.post("/api/v1/users/login", json={ + "username": "passwordupdate", + "password": "oldpassword" + }) + assert login_response.status_code == 200 + + # Admin updates password + update_data = { + "password": "newpassword" + } + response = client.put(f"/api/v1/users/{user_id}", + json=update_data, + headers={"Authorization": f"Bearer {admin_token}"}) + assert response.status_code == 200 + + # Verify old password no longer works + old_login_response = client.post("/api/v1/users/login", json={ + "username": "passwordupdate", + "password": "oldpassword" + }) + assert old_login_response.status_code == 401 + + # Verify new password works + new_login_response = client.post("/api/v1/users/login", json={ + "username": "passwordupdate", + "password": "newpassword" + }) + assert new_login_response.status_code == 200 + + +def test_partial_user_update(client: TestClient, admin_token: str): + """Test partial user updates (only some fields).""" + # Create user + user_data = { + "username": "partialupdate", + "password": "password123", + "role": "read_only" + } + create_response = client.post("/api/v1/users/", json=user_data) + user_id = create_response.json()["id"] + original_username = create_response.json()["username"] + + # Update only role + update_data = { + "role": "write" + } + response = client.put(f"/api/v1/users/{user_id}", + json=update_data, + headers={"Authorization": f"Bearer {admin_token}"}) + assert response.status_code == 200 + data = response.json() + assert data["username"] == original_username # Should remain unchanged + assert data["role"] == "write" # Should be updated + + +def test_non_admin_cannot_update_users(client: TestClient, admin_token: str): + """Test that non-admin users cannot update other users.""" + # Create two users + user1_data = { + "username": "user1update", + "password": "testpassword", + "role": "write" + } + user2_data = { + "username": "user2update", + "password": "testpassword", + "role": "read_only" + } + + create_response1 = client.post("/api/v1/users/", json=user1_data) + user1_id = create_response1.json()["id"] + + create_response2 = client.post("/api/v1/users/", json=user2_data) + user2_id = create_response2.json()["id"] + + # Admin approves user1 + approval_data = {"is_approved": True} + client.put(f"/api/v1/users/{user1_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {admin_token}"}) + + # User1 logs in + login_response = client.post("/api/v1/users/login", json={ + "username": "user1update", + "password": "testpassword" + }) + user1_token = login_response.json()["access_token"] + + # User1 tries to update user2 - should fail + update_data = {"role": "admin"} + response = client.put(f"/api/v1/users/{user2_id}", + json=update_data, + headers={"Authorization": f"Bearer {user1_token}"}) + assert response.status_code == 403 + + +def test_update_nonexistent_user(client: TestClient, admin_token: str): + """Test updating a user that doesn't exist.""" + update_data = { + "username": "newname", + "role": "write" + } + response = client.put("/api/v1/users/99999", + json=update_data, + headers={"Authorization": f"Bearer {admin_token}"}) + assert response.status_code == 404 + assert "User not found" in response.json()["detail"] + + +def test_update_user_with_duplicate_username(client: TestClient, admin_token: str): + """Test that updating a user with an existing username fails.""" + # Create two users + user1_data = { + "username": "user1unique", + "password": "password1", + "role": "read_only" + } + user2_data = { + "username": "user2unique", + "password": "password2", + "role": "read_only" + } + + create_response1 = client.post("/api/v1/users/", json=user1_data) + user1_id = create_response1.json()["id"] + + client.post("/api/v1/users/", json=user2_data) + + # Try to update user1 to have user2's username + update_data = { + "username": "user2unique" + } + response = client.put(f"/api/v1/users/{user1_id}", + json=update_data, + headers={"Authorization": f"Bearer {admin_token}"}) + # This should fail - but we need to implement this validation in the endpoint + # For now, let's just check if it fails with any 4xx error + assert response.status_code >= 400 + + +def test_user_can_change_own_password(client: TestClient, admin_token: str): + """Test that users can change their own password.""" + # Create and approve user + user_data = { + "username": "selfpasschange", + "password": "oldpassword123", + "role": "read_only" + } + create_response = client.post("/api/v1/users/", json=user_data) + user_id = create_response.json()["id"] + + # Admin approves the user + approval_data = {"is_approved": True} + client.put(f"/api/v1/users/{user_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {admin_token}"}) + + # User logs in + login_response = client.post("/api/v1/users/login", json={ + "username": "selfpasschange", + "password": "oldpassword123" + }) + user_token = login_response.json()["access_token"] + + # User changes their own password + password_change_data = { + "current_password": "oldpassword123", + "new_password": "newpassword456" + } + response = client.put("/api/v1/users/me/change-password", + json=password_change_data, + headers={"Authorization": f"Bearer {user_token}"}) + assert response.status_code == 200 + assert "Password changed successfully" in response.json()["message"] + + # Verify old password no longer works + old_login_response = client.post("/api/v1/users/login", json={ + "username": "selfpasschange", + "password": "oldpassword123" + }) + assert old_login_response.status_code == 401 + + # Verify new password works + new_login_response = client.post("/api/v1/users/login", json={ + "username": "selfpasschange", + "password": "newpassword456" + }) + assert new_login_response.status_code == 200 + + +def test_password_change_with_wrong_current_password(client: TestClient, admin_token: str): + """Test that password change fails with incorrect current password.""" + # Create and approve user + user_data = { + "username": "wrongpasstest", + "password": "correctpassword", + "role": "read_only" + } + create_response = client.post("/api/v1/users/", json=user_data) + user_id = create_response.json()["id"] + + # Admin approves the user + approval_data = {"is_approved": True} + client.put(f"/api/v1/users/{user_id}/approval", + json=approval_data, + headers={"Authorization": f"Bearer {admin_token}"}) + + # User logs in + login_response = client.post("/api/v1/users/login", json={ + "username": "wrongpasstest", + "password": "correctpassword" + }) + user_token = login_response.json()["access_token"] + + # Try to change password with wrong current password + password_change_data = { + "current_password": "wrongpassword", + "new_password": "newpassword456" + } + response = client.put("/api/v1/users/me/change-password", + json=password_change_data, + headers={"Authorization": f"Bearer {user_token}"}) + assert response.status_code == 400 + assert "Current password is incorrect" in response.json()["detail"] + + +def test_email_verification_password_reset_request(client: TestClient): + """Test password reset request via email verification.""" + # Create user + user_data = { + "username": "emailresettest", + "password": "password123", + "role": "read_only" + } + client.post("/api/v1/users/", json=user_data) + + # Request password reset (should always return success) + reset_request_data = { + "username": "emailresettest", + "email": "user@example.com" + } + response = client.post("/api/v1/users/request-password-reset", + json=reset_request_data) + assert response.status_code == 200 + assert "receive instructions" in response.json()["message"] + + # Test with non-existent user (should also return success for security) + reset_request_data = { + "username": "nonexistentuser", + "email": "fake@example.com" + } + response = client.post("/api/v1/users/request-password-reset", + json=reset_request_data) + assert response.status_code == 200 + assert "receive instructions" in response.json()["message"] diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index ab2897d..c59a646 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -41,7 +41,8 @@ def admin_user_fixture(session: Session): admin_user = User( username="testadmin", password_hash=get_password_hash("adminpassword"), - role=UserRole.ADMIN + role=UserRole.ADMIN, + is_approved=True # Admin users are pre-approved for testing ) session.add(admin_user) session.commit() @@ -55,7 +56,8 @@ def write_user_fixture(session: Session): write_user = User( username="writeuser", password_hash=get_password_hash("writepassword"), - role=UserRole.WRITE + role=UserRole.WRITE, + is_approved=True # Pre-approved for testing ) session.add(write_user) session.commit() @@ -69,7 +71,8 @@ def read_only_user_fixture(session: Session): read_only_user = User( username="readuser", password_hash=get_password_hash("readpassword"), - role=UserRole.READ_ONLY + role=UserRole.READ_ONLY, + is_approved=True # Pre-approved for testing ) session.add(read_only_user) session.commit()