Files
CMT/backend/app/api/v1/users.py
T

319 lines
9.7 KiB
Python

# backend/app/api/v1/users.py
from datetime import timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer
from sqlmodel import Session, select
from app.core.db import get_session
from app.core.auth import (
authenticate_user,
create_access_token,
get_password_hash,
get_current_active_user,
get_token_expiration_minutes,
require_admin,
require_write_access,
require_any_access,
verify_password,
send_password_reset_email
)
from app.schemas.models import User
from app.schemas.schemas import (
UserCreate,
UserUpdate,
UserLogin,
Token,
UserResponse,
UserApprovalUpdate,
PasswordChangeRequest,
EmailVerificationRequest
)
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/login", response_model=Token)
def login(user_credentials: UserLogin, session: Session = Depends(get_session)):
"""Authenticate user and return JWT token with role-based expiration."""
user, error_message = authenticate_user(session, user_credentials.username, user_credentials.password)
if not user:
error_details = {
"user_not_found": "Username not found",
"invalid_password": "Incorrect password",
"account_pending_approval": "Account pending admin approval. Please contact an administrator."
}
# Use different status codes for different error types
status_code = status.HTTP_401_UNAUTHORIZED
if error_message == "account_pending_approval":
status_code = status.HTTP_403_FORBIDDEN
raise HTTPException(
status_code=status_code,
detail=error_details.get(error_message, "Authentication failed"),
headers={"WWW-Authenticate": "Bearer"},
)
# Get role-based expiration time
expire_minutes = get_token_expiration_minutes(user.role)
access_token_expires = timedelta(minutes=expire_minutes)
# Create token with user data
access_token = create_access_token(
data={
"sub": user.username,
"user_id": user.id,
"role": user.role.value
},
expires_delta=access_token_expires
)
return Token(
access_token=access_token,
token_type="bearer",
expires_in=expire_minutes * 60, # Convert to seconds
user=UserResponse(
id=user.id,
username=user.username,
role=user.role,
is_approved=user.is_approved
)
)
@router.get("/me", response_model=UserResponse)
def get_current_user_info(current_user: User = Depends(get_current_active_user)):
"""Get current user information from token."""
return UserResponse(
id=current_user.id,
username=current_user.username,
role=current_user.role,
is_approved=current_user.is_approved
)
@router.get("/", response_model=list[UserResponse])
def get_all_users(
session: Session = Depends(get_session),
current_user: User = Depends(require_admin),
skip: int = 0,
limit: int = 100
):
"""Get all users (requires admin authenticated role)."""
statement = select(User).offset(skip).limit(limit)
users = session.exec(statement).all()
return [
UserResponse(id=user.id, username=user.username, role=user.role, is_approved=user.is_approved)
for user in users
]
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
user: UserCreate,
session: Session = Depends(get_session)
):
"""Create a new user (public registration - requires admin approval to login)."""
# Check if username already exists
statement = select(User).where(User.username == user.username)
existing_user = session.exec(statement).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Create new user with hashed password (not approved by default)
hashed_password = get_password_hash(user.password)
db_user = User(
username=user.username,
password_hash=hashed_password,
role=user.role,
is_approved=False # Requires admin approval
)
session.add(db_user)
session.commit()
session.refresh(db_user)
return UserResponse(
id=db_user.id,
username=db_user.username,
role=db_user.role,
is_approved=db_user.is_approved
)
@router.get("/{user_id}", response_model=UserResponse)
def get_user(
user_id: int,
session: Session = Depends(get_session),
current_user: User = Depends(require_any_access)
):
"""Get specific user by ID (requires authentication)."""
user = session.get(User, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return UserResponse(
id=user.id,
username=user.username,
role=user.role,
is_approved=user.is_approved
)
# Update to handle user self-updating password
@router.put("/{user_id}", response_model=UserResponse)
def update_user(
user_id: int,
user_update: UserUpdate,
session: Session = Depends(get_session),
current_user: UserResponse = Depends(require_admin)
):
"""Update specific user (admin only)."""
user = session.get(User, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Get update data
update_data = user_update.model_dump(exclude_unset=True)
# Check for duplicate username if username is being updated
if "username" in update_data and update_data["username"] != user.username:
statement = select(User).where(User.username == update_data["username"])
existing_user = session.exec(statement).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists"
)
# Handle password hashing if password is being updated
if "password" in update_data:
hashed_password = get_password_hash(update_data["password"])
update_data["password_hash"] = hashed_password
del update_data["password"] # Remove plain text password
# Update only provided fields
for key, value in update_data.items():
setattr(user, key, value)
session.add(user)
session.commit()
session.refresh(user)
return UserResponse(
id=user.id,
username=user.username,
role=user.role,
is_approved=user.is_approved
)
@router.put("/{user_id}/approval", response_model=UserResponse)
def update_user_approval(
user_id: int,
approval_update: UserApprovalUpdate,
session: Session = Depends(get_session),
current_user: User = Depends(require_admin)
):
"""Approve or reject a user account (admin only)."""
user = session.get(User, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Update approval status
user.is_approved = approval_update.is_approved
session.add(user)
session.commit()
session.refresh(user)
return UserResponse(
id=user.id,
username=user.username,
role=user.role,
is_approved=user.is_approved
)
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(
user_id: int,
session: Session = Depends(get_session),
current_user: User = Depends(require_admin)
):
"""Delete specific user (admin only)."""
user = session.get(User, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Prevent self-deletion
if user.id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete your own account"
)
session.delete(user)
session.commit()
return None
@router.put("/me/change-password")
def change_password(
password_change: PasswordChangeRequest,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_active_user)
):
"""Change password (user must know current password)."""
# Verify current password
if not verify_password(password_change.current_password, current_user.password_hash):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect"
)
# Update to new password
hashed_password = get_password_hash(password_change.new_password)
current_user.password_hash = hashed_password
session.add(current_user)
session.commit()
return {"message": "Password changed successfully"}
@router.post("/request-password-reset")
def request_password_reset(
reset_request: EmailVerificationRequest,
session: Session = Depends(get_session)
):
"""Request password reset via email verification (no database needed)."""
# Find user by username
statement = select(User).where(User.username == reset_request.username)
user = session.exec(statement).first()
# Always return success to prevent username enumeration
if user and user.is_approved:
# Send email with instructions (mock implementation)
send_password_reset_email(reset_request.username, reset_request.email)
return {
"message": "If your username and email are correct, you will receive instructions to reset your password."
}