Skip to content
OVEX TECH
Education & E-Learning

Implement User Registration and Login with FastAPI and JWT

Implement User Registration and Login with FastAPI and JWT

How to Implement User Registration and Login with FastAPI and JWT

In this tutorial, you will learn how to add secure user registration and login functionality to your FastAPI application. We will cover password hashing using Argon2, generating and verifying JSON Web Tokens (JWT) for authentication, and setting up configuration management with Pydantic Settings. You will also learn how to create distinct user response schemas for public and private data and implement case-insensitive checks for user uniqueness.

Prerequisites

  • Basic understanding of Python and FastAPI
  • Familiarity with database operations (e.g., using SQLAlchemy with SQLite)
  • An existing FastAPI project structure (as referenced from previous tutorials)

Step 1: Install Necessary Packages

First, we need to install three essential Python packages to handle authentication:

  • `python-multipart`: Required for handling form data.
  • `fastapi[security]`: Provides utilities for authentication, including OAuth2 password handling.
  • `passlib[argon2]`: For secure password hashing. Argon2 is a modern and robust hashing algorithm.
  • `python-jose[cryptography]`: A library for handling JSON Web Tokens (JWT).
  • `pydantic-settings`: For centralized and validated application configuration.

You can install these using pip:

pip install python-multipart fastapi[security] passlib[argon2] python-jose[cryptography] pydantic-settings

Step 2: Update Database Models

Before adding authentication, we need to modify our user model to include a password hash. In your models.py file, locate the User class and add a password_hash field.

Important: Never store plain text passwords. Always store a secure hash.

Modify the User model in models.py:

# ... other imports
from sqlalchemy.orm import Mapped, mapped_column

class User(Base):
    # ... other fields (id, username, email, image_file)
    password_hash: Mapped[str] = mapped_column(String(200), nullable=False)

Note: For development with SQLite, it’s often easiest to delete and recreate the database when altering schemas with non-nullable fields. In production, use database migrations (e.g., with Alembic).

Step 3: Update Schemas

In your schemas.py file, you’ll need to adjust the Pydantic schemas for user creation and responses:

User Creation Schema

Add a password field to the UserCreate schema. This is what the user will submit during registration. We’ll enforce a minimum length for security.

# ... other imports
from pydantic import Field

class UserBase(BaseModel):
    username: str
    email: str

class UserCreate(UserBase):
    password: str = Field(min_length=8)

User Response Schemas

To enhance privacy, we’ll create distinct schemas for public and private user data. The UserPublic schema will exclude sensitive information like email, while UserPrivate will include it.

# ... other imports

class UserPublic(BaseModel):
    id: int
    username: str
    image_file: Optional[str] = None

class UserPrivate(UserPublic):
    email: str

Update the PostResponse schema to use UserPublic for the author field:

# ... other imports

class PostBase(BaseModel):
    title: str
    content: str

class PostCreate(PostBase):
    pass

class Post(PostBase):
    id: int
    author_id: int
    author: UserPublic # Changed from UserResponse

class PostResponse(Post):
    pass

Token Schema

Create a Token schema to represent the JWT response after successful login.

# ... other imports

class Token(BaseModel):
    access_token: str
    token_type: str

Step 4: Configure Application Settings

Create a config.py file in the root of your project to manage application settings, including your JWT secret key. Pydantic Settings provides type validation and environment variable loading.

Create config.py:

from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import SecretStr

class Settings(BaseSettings):
    SECRET_KEY: SecretStr
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30

    model_config = SettingsConfigDict(
        env_file='.env',
        env_file_encoding='utf-8'
    )

settings = Settings()

Create a .env file in the same directory to store your secret key. Ensure .env is added to your .gitignore file to prevent committing secrets.

Create .env:

SECRET_KEY=your_super_secret_key_here_generate_a_long_one

Tip: To generate a strong secret key, run the following in your Python interpreter or terminal:

python -c "import secrets; print(secrets.token_hex(32))"

Step 5: Create Authentication Utilities

Create a new file named auth_utils.py in your project’s root directory. This file will contain functions for password hashing, JWT creation/verification, and OAuth2 schema setup.

Create auth_utils.py:

from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional, Dict, Any

from config import settings

# Password Hashing
pwd_context = CryptContext.from_passlib(prefix="sha256_crypt", cryptos={'argon2': {'label': 'argon2'}}) # Using recommended Argon2

# OAuth2 Scheme
# The tokenUrl must match your login endpoint path
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/users/token")

def get_password_hash(password: str) -> str:
    """Hashes a plain text password."""
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verifies a plain text password against a hash."""
    return pwd_context.verify(plain_password, hashed_password)

# JWT Token Utilities
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
    """Creates a JWT access token."""
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    
    to_encode = {"exp": expire, "sub": str(data.get("user_id"))}
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY.get_secret_value(), algorithm=settings.ALGORITHM)
    return encoded_jwt

def verify_access_token(token: str) -> Optional[str]:
    """Verifies a JWT access token and returns the user ID or None."""
    try:
        payload = jwt.decode(token, settings.SECRET_KEY.get_secret_value(), algorithms=[settings.ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            return None
        return user_id
    except JWTError:
        return None

# --- Schemas for token and request form --- 
class TokenData(BaseModel):
    user_id: Optional[str] = None

class Token(BaseModel):
    access_token: str
    token_type: str

Step 6: Update User Routes

Now, let’s integrate these authentication utilities into your user routes (e.g., in routers/users.py).

Imports

Add the necessary imports to your user routes file:

# ... other imports
from datetime import timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from sqlalchemy import func

from database import get_db
from models import User
from schemas import UserCreate, UserPublic, UserPrivate, Token
from auth_utils import ( 
    oauth2_scheme, 
    create_access_token, 
    verify_access_token, 
    get_password_hash, 
    verify_password 
)
from config import settings

Update Create User Route

Modify the create_user endpoint to hash the password and use the appropriate response schema.

Implement case-insensitive checks for username and email uniqueness. Store emails in lowercase.

@router.post("/", response_model=UserPrivate, status_code=status.HTTP_201_CREATED)
def create_user(
    user: UserCreate,
    db: Session = Depends(get_db)
):
    # Check for existing username (case-insensitive)
    existing_user = db.query(User).filter(func.lower(User.username) == func.lower(user.username)).first()
    if existing_user:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")

    # Check for existing email (case-insensitive)
    existing_email = db.query(User).filter(func.lower(User.email) == func.lower(user.email)).first()
    if existing_email:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")

    hashed_password = get_password_hash(user.password)
    db_user = User(
        username=user.username, 
        email=user.email.lower(), # Store email in lowercase
        password_hash=hashed_password
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

Create Login Endpoint

Add a new endpoint for user login that issues JWTs upon successful authentication.

@router.post("/token", response_model=Token)
def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: Session = Depends(get_db)
):
    user = db.query(User).filter(func.lower(User.email) == func.lower(form_data.username)).first() # Using form_data.username for email
    
    if not user or not verify_password(form_data.password, user.password_hash):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(data={"user_id": user.id}, expires_delta=access_token_expires)
    return {"access_token": access_token, "token_type": "bearer"}

Security Note: Returning a generic “Incorrect username or password” error prevents attackers from discovering whether an email exists in your system.

Create ‘/me’ Endpoint

Add an endpoint to retrieve the currently logged-in user’s information. This endpoint uses the JWT from the Authorization header.

Important: Place this endpoint before the get_user endpoint in your router to ensure correct route matching.

@router.get("/me", response_model=UserPrivate)
def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
):
    user_id = verify_access_token(token)
    if user_id is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    try:
        user_id_int = int(user_id)
    except ValueError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid user ID in token",
            headers={"WWW-Authenticate": "Bearer"},
        )
        
    user = db.query(User).filter(User.id == user_id_int).first()
    if user is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
    return user

Update Existing Routes

Update the get_user and update_user routes to use the new response schemas and implement case-insensitive checks where necessary.

@router.get("/{user_id}", response_model=UserPublic)
def get_user(
    user_id: int,
    db: Session = Depends(get_db)
):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
    return db_user

@router.put("/{user_id}", response_model=UserPrivate)
def update_user(
    user_id: int,
    user_update: UserCreate, # Assuming UserCreate can be used for updates, adjust if needed
    db: Session = Depends(get_db)
):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")

    # Case-insensitive checks for conflicts with other users
    if func.lower(User.username) != func.lower(db_user.username) and db.query(User).filter(func.lower(User.username) == func.lower(user_update.username)).first():
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")
    if func.lower(User.email) != func.lower(db_user.email) and db.query(User).filter(func.lower(User.email) == func.lower(user_update.email)).first():
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")

    db_user.username = user_update.username
    db_user.email = user_update.email.lower() # Normalize email to lowercase
    
    # Only update password hash if a new password is provided
    if user_update.password:
        db_user.password_hash = get_password_hash(user_update.password)

    db.commit()
    db.refresh(db_user)
    return db_user

Conclusion

You have now successfully implemented user registration and login using FastAPI, Argon2 for password hashing, and JWT for token-based authentication. This forms the foundation for securing your application’s routes and resources, which will be covered in the next part of this tutorial series.


Source: Python FastAPI Tutorial (Part 10): Authentication – Registration and Login with JWT (YouTube)

Leave a Reply

Your email address will not be published. Required fields are marked *

Written by

John Digweed

1,283 articles

Life-long learner.