# backend/app/api/v1/users.py from datetime import timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status, Request from fastapi.security import HTTPBearer from sqlmodel import Session, select from app.core.db import get_session from app.core.auth import ( authenticate_user, create_access_token, get_password_hash, get_current_active_user, get_token_expiration_minutes, require_admin, require_write_access, require_any_access, verify_password, send_password_reset_email ) from app.schemas.models import User from app.schemas.schemas import ( UserCreate, UserUpdate, UserLogin, Token, UserResponse, UserApprovalUpdate, PasswordChangeRequest, EmailVerificationRequest ) router = APIRouter(prefix="/users", tags=["users"]) @router.post("/login", response_model=Token) def login(user_credentials: UserLogin, session: Session = Depends(get_session)): """Authenticate user and return JWT token with role-based expiration.""" user, error_message = authenticate_user(session, user_credentials.username, user_credentials.password) if not user: error_details = { "user_not_found": "Username not found", "invalid_password": "Incorrect password", "account_pending_approval": "Account pending admin approval. Please contact an administrator." } # Use different status codes for different error types status_code = status.HTTP_401_UNAUTHORIZED if error_message == "account_pending_approval": status_code = status.HTTP_403_FORBIDDEN raise HTTPException( status_code=status_code, detail=error_details.get(error_message, "Authentication failed"), headers={"WWW-Authenticate": "Bearer"}, ) # Get role-based expiration time expire_minutes = get_token_expiration_minutes(user.role) access_token_expires = timedelta(minutes=expire_minutes) # Create token with user data access_token = create_access_token( data={ "sub": user.username, "user_id": user.id, "role": user.role.value }, expires_delta=access_token_expires ) return Token( access_token=access_token, token_type="bearer", expires_in=expire_minutes * 60, # Convert to seconds user=UserResponse( id=user.id, username=user.username, role=user.role, is_approved=user.is_approved ) ) @router.get("/me", response_model=UserResponse) def get_current_user_info(current_user: User = Depends(get_current_active_user)): """Get current user information from token.""" return UserResponse( id=current_user.id, username=current_user.username, role=current_user.role, is_approved=current_user.is_approved ) @router.get("/", response_model=list[UserResponse]) def get_all_users( session: Session = Depends(get_session), current_user: User = Depends(require_admin), skip: int = 0, limit: int = 100 ): """Get all users (requires admin authenticated role).""" statement = select(User).offset(skip).limit(limit) users = session.exec(statement).all() return [ UserResponse(id=user.id, username=user.username, role=user.role, is_approved=user.is_approved) for user in users ] @router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user( user: UserCreate, session: Session = Depends(get_session) ): """Create a new user (public registration - requires admin approval to login).""" # Check if username already exists statement = select(User).where(User.username == user.username) existing_user = session.exec(statement).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered" ) # Create new user with hashed password (not approved by default) hashed_password = get_password_hash(user.password) db_user = User( username=user.username, password_hash=hashed_password, role=user.role, is_approved=False # Requires admin approval ) session.add(db_user) session.commit() session.refresh(db_user) return UserResponse( id=db_user.id, username=db_user.username, role=db_user.role, is_approved=db_user.is_approved ) @router.get("/{user_id}", response_model=UserResponse) def get_user( user_id: int, session: Session = Depends(get_session), current_user: User = Depends(require_any_access) ): """Get specific user by ID (requires authentication).""" user = session.get(User, user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return UserResponse( id=user.id, username=user.username, role=user.role, is_approved=user.is_approved ) # Update to handle user self-updating password @router.put("/{user_id}", response_model=UserResponse) def update_user( user_id: int, user_update: UserUpdate, session: Session = Depends(get_session), current_user: UserResponse = Depends(require_admin) ): """Update specific user (admin only).""" user = session.get(User, user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Get update data update_data = user_update.model_dump(exclude_unset=True) # Check for duplicate username if username is being updated if "username" in update_data and update_data["username"] != user.username: statement = select(User).where(User.username == update_data["username"]) existing_user = session.exec(statement).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists" ) # Handle password hashing if password is being updated if "password" in update_data: hashed_password = get_password_hash(update_data["password"]) update_data["password_hash"] = hashed_password del update_data["password"] # Remove plain text password # Update only provided fields for key, value in update_data.items(): setattr(user, key, value) session.add(user) session.commit() session.refresh(user) return UserResponse( id=user.id, username=user.username, role=user.role, is_approved=user.is_approved ) @router.put("/{user_id}/approval", response_model=UserResponse) def update_user_approval( user_id: int, approval_update: UserApprovalUpdate, session: Session = Depends(get_session), current_user: User = Depends(require_admin) ): """Approve or reject a user account (admin only).""" user = session.get(User, user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Update approval status user.is_approved = approval_update.is_approved session.add(user) session.commit() session.refresh(user) return UserResponse( id=user.id, username=user.username, role=user.role, is_approved=user.is_approved ) @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_user( user_id: int, session: Session = Depends(get_session), current_user: User = Depends(require_admin) ): """Delete specific user (admin only).""" user = session.get(User, user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Prevent self-deletion if user.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account" ) session.delete(user) session.commit() return None @router.put("/me/change-password") def change_password( password_change: PasswordChangeRequest, session: Session = Depends(get_session), current_user: User = Depends(get_current_active_user) ): """Change password (user must know current password).""" # Verify current password if not verify_password(password_change.current_password, current_user.password_hash): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Current password is incorrect" ) # Update to new password hashed_password = get_password_hash(password_change.new_password) current_user.password_hash = hashed_password session.add(current_user) session.commit() return {"message": "Password changed successfully"} @router.post("/request-password-reset") def request_password_reset( reset_request: EmailVerificationRequest, session: Session = Depends(get_session) ): """Request password reset via email verification (no database needed).""" # Find user by username statement = select(User).where(User.username == reset_request.username) user = session.exec(statement).first() # Always return success to prevent username enumeration if user and user.is_approved: # Send email with instructions (mock implementation) send_password_reset_email(reset_request.username, reset_request.email) return { "message": "If your username and email are correct, you will receive instructions to reset your password." }