How to Implement User Registration and Login with FastAPI and JWT
In this tutorial, you will learn how to add secure user registration and login functionality to your FastAPI application. We will cover password hashing using Argon2, generating and verifying JSON Web Tokens (JWT) for authentication, and setting up configuration management with Pydantic Settings. You will also learn how to create distinct user response schemas for public and private data and implement case-insensitive checks for user uniqueness.
Prerequisites
- Basic understanding of Python and FastAPI
- Familiarity with database operations (e.g., using SQLAlchemy with SQLite)
- An existing FastAPI project structure (as referenced from previous tutorials)
Step 1: Install Necessary Packages
First, we need to install three essential Python packages to handle authentication:
- `python-multipart`: Required for handling form data.
- `fastapi[security]`: Provides utilities for authentication, including OAuth2 password handling.
- `passlib[argon2]`: For secure password hashing. Argon2 is a modern and robust hashing algorithm.
- `python-jose[cryptography]`: A library for handling JSON Web Tokens (JWT).
- `pydantic-settings`: For centralized and validated application configuration.
You can install these using pip:
pip install python-multipart fastapi[security] passlib[argon2] python-jose[cryptography] pydantic-settings
Step 2: Update Database Models
Before adding authentication, we need to modify our user model to include a password hash. In your models.py file, locate the User class and add a password_hash field.
Important: Never store plain text passwords. Always store a secure hash.
Modify the User model in models.py:
# ... other imports
from sqlalchemy.orm import Mapped, mapped_column
class User(Base):
# ... other fields (id, username, email, image_file)
password_hash: Mapped[str] = mapped_column(String(200), nullable=False)
Note: For development with SQLite, it’s often easiest to delete and recreate the database when altering schemas with non-nullable fields. In production, use database migrations (e.g., with Alembic).
Step 3: Update Schemas
In your schemas.py file, you’ll need to adjust the Pydantic schemas for user creation and responses:
User Creation Schema
Add a password field to the UserCreate schema. This is what the user will submit during registration. We’ll enforce a minimum length for security.
# ... other imports
from pydantic import Field
class UserBase(BaseModel):
username: str
email: str
class UserCreate(UserBase):
password: str = Field(min_length=8)
User Response Schemas
To enhance privacy, we’ll create distinct schemas for public and private user data. The UserPublic schema will exclude sensitive information like email, while UserPrivate will include it.
# ... other imports
class UserPublic(BaseModel):
id: int
username: str
image_file: Optional[str] = None
class UserPrivate(UserPublic):
email: str
Update the PostResponse schema to use UserPublic for the author field:
# ... other imports
class PostBase(BaseModel):
title: str
content: str
class PostCreate(PostBase):
pass
class Post(PostBase):
id: int
author_id: int
author: UserPublic # Changed from UserResponse
class PostResponse(Post):
pass
Token Schema
Create a Token schema to represent the JWT response after successful login.
# ... other imports
class Token(BaseModel):
access_token: str
token_type: str
Step 4: Configure Application Settings
Create a config.py file in the root of your project to manage application settings, including your JWT secret key. Pydantic Settings provides type validation and environment variable loading.
Create config.py:
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import SecretStr
class Settings(BaseSettings):
SECRET_KEY: SecretStr
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
model_config = SettingsConfigDict(
env_file='.env',
env_file_encoding='utf-8'
)
settings = Settings()
Create a .env file in the same directory to store your secret key. Ensure .env is added to your .gitignore file to prevent committing secrets.
Create .env:
SECRET_KEY=your_super_secret_key_here_generate_a_long_one
Tip: To generate a strong secret key, run the following in your Python interpreter or terminal:
python -c "import secrets; print(secrets.token_hex(32))"
Step 5: Create Authentication Utilities
Create a new file named auth_utils.py in your project’s root directory. This file will contain functions for password hashing, JWT creation/verification, and OAuth2 schema setup.
Create auth_utils.py:
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional, Dict, Any
from config import settings
# Password Hashing
pwd_context = CryptContext.from_passlib(prefix="sha256_crypt", cryptos={'argon2': {'label': 'argon2'}}) # Using recommended Argon2
# OAuth2 Scheme
# The tokenUrl must match your login endpoint path
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/users/token")
def get_password_hash(password: str) -> str:
"""Hashes a plain text password."""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifies a plain text password against a hash."""
return pwd_context.verify(plain_password, hashed_password)
# JWT Token Utilities
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""Creates a JWT access token."""
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expire, "sub": str(data.get("user_id"))}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY.get_secret_value(), algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_access_token(token: str) -> Optional[str]:
"""Verifies a JWT access token and returns the user ID or None."""
try:
payload = jwt.decode(token, settings.SECRET_KEY.get_secret_value(), algorithms=[settings.ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
return None
return user_id
except JWTError:
return None
# --- Schemas for token and request form ---
class TokenData(BaseModel):
user_id: Optional[str] = None
class Token(BaseModel):
access_token: str
token_type: str
Step 6: Update User Routes
Now, let’s integrate these authentication utilities into your user routes (e.g., in routers/users.py).
Imports
Add the necessary imports to your user routes file:
# ... other imports
from datetime import timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from sqlalchemy import func
from database import get_db
from models import User
from schemas import UserCreate, UserPublic, UserPrivate, Token
from auth_utils import (
oauth2_scheme,
create_access_token,
verify_access_token,
get_password_hash,
verify_password
)
from config import settings
Update Create User Route
Modify the create_user endpoint to hash the password and use the appropriate response schema.
Implement case-insensitive checks for username and email uniqueness. Store emails in lowercase.
@router.post("/", response_model=UserPrivate, status_code=status.HTTP_201_CREATED)
def create_user(
user: UserCreate,
db: Session = Depends(get_db)
):
# Check for existing username (case-insensitive)
existing_user = db.query(User).filter(func.lower(User.username) == func.lower(user.username)).first()
if existing_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")
# Check for existing email (case-insensitive)
existing_email = db.query(User).filter(func.lower(User.email) == func.lower(user.email)).first()
if existing_email:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
hashed_password = get_password_hash(user.password)
db_user = User(
username=user.username,
email=user.email.lower(), # Store email in lowercase
password_hash=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
Create Login Endpoint
Add a new endpoint for user login that issues JWTs upon successful authentication.
@router.post("/token", response_model=Token)
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
user = db.query(User).filter(func.lower(User.email) == func.lower(form_data.username)).first() # Using form_data.username for email
if not user or not verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"user_id": user.id}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
Security Note: Returning a generic “Incorrect username or password” error prevents attackers from discovering whether an email exists in your system.
Create ‘/me’ Endpoint
Add an endpoint to retrieve the currently logged-in user’s information. This endpoint uses the JWT from the Authorization header.
Important: Place this endpoint before the get_user endpoint in your router to ensure correct route matching.
@router.get("/me", response_model=UserPrivate)
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
):
user_id = verify_access_token(token)
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
user_id_int = int(user_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid user ID in token",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(User).filter(User.id == user_id_int).first()
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
Update Existing Routes
Update the get_user and update_user routes to use the new response schemas and implement case-insensitive checks where necessary.
@router.get("/{user_id}", response_model=UserPublic)
def get_user(
user_id: int,
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
return db_user
@router.put("/{user_id}", response_model=UserPrivate)
def update_user(
user_id: int,
user_update: UserCreate, # Assuming UserCreate can be used for updates, adjust if needed
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
# Case-insensitive checks for conflicts with other users
if func.lower(User.username) != func.lower(db_user.username) and db.query(User).filter(func.lower(User.username) == func.lower(user_update.username)).first():
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")
if func.lower(User.email) != func.lower(db_user.email) and db.query(User).filter(func.lower(User.email) == func.lower(user_update.email)).first():
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
db_user.username = user_update.username
db_user.email = user_update.email.lower() # Normalize email to lowercase
# Only update password hash if a new password is provided
if user_update.password:
db_user.password_hash = get_password_hash(user_update.password)
db.commit()
db.refresh(db_user)
return db_user
Conclusion
You have now successfully implemented user registration and login using FastAPI, Argon2 for password hashing, and JWT for token-based authentication. This forms the foundation for securing your application’s routes and resources, which will be covered in the next part of this tutorial series.
Source: Python FastAPI Tutorial (Part 10): Authentication – Registration and Login with JWT (YouTube)