Authentication FastAPI with Keycloak

First you need to create a realm and client, and get the client id and secret.

Go to Keycloak configuration problems and solutions to see how to setup at Keycloak Admin Panel.

1 Install dependencies

pip install python-jose[cryptography]>=3.3.0
pip install cryptography>=3.4.0
pip install PyJWT==2.10.1

Setup configuration

Add following to your configuration file, like config.py:

from os import getenv

# Keycloak Settings
KEYCLOAK_URL = getenv("KEYCLOAK_URL")
KEYCLOAK_REALM = getenv("KEYCLOAK_REALM")
KEYCLOAK_CLIENT_ID = getenv("KEYCLOAK_CLIENT_ID")
KEYCLOAK_CLIENT_SECRET = getenv("KEYCLOAK_CLIENT_SECRET") 
KEYCLOAK_ALGORITHM = getenv("KEYCLOAK_ALGORITHM", "ES256")

# OpenID Connect endpoints
OIDC_JWKS_URI = f"{KEYCLOAK_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/certs"
OIDC_TOKEN_ENDPOINT = f"{KEYCLOAK_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token"

2 Add Authentication Service

let’s name it auth_service.py:

from typing import Dict, Any
from jose import jwt, JWTError
from jose.exceptions import JWTClaimsError
from jwt import PyJWKClient
from fastapi import HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer

from src.config import (
    KEYCLOAK_ALGORITHM,
    OIDC_JWKS_URI,
    OIDC_TOKEN_ENDPOINT,
    KEYCLOAK_CLIENT_ID,
    KEYCLOAK_CLIENT_SECRET,
)

oauth2_scheme = OAuth2PasswordBearer(tokenUrl=OIDC_TOKEN_ENDPOINT)
jwks_client = PyJWKClient(OIDC_JWKS_URI)

def decode_token(token: str) -> Dict[str, Any]:
    try:
        # Extract the key from the JWKS endpoint
        signing_key = jwks_client.get_signing_key_from_jwt(token).key

        # Decode and verify the token
        payload = jwt.decode(
            token,
            signing_key,
            algorithms=[KEYCLOAK_ALGORITHM],
            audience=KEYCLOAK_CLIENT_ID,
            options={"verify_exp": True}
        )
        return payload
    except JWTClaimsError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Invalid token claims: {str(e)}"
        )
    except JWTError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Invalid token: {str(e)}",
            headers={"WWW-Authenticate": "Bearer"},
        )

async def get_current_user(token: str = Depends(oauth2_scheme)):
    if not KEYCLOAK_CLIENT_SECRET:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Keycloak client secret not configured"
        )

    payload = decode_token(token)
    if not payload:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return payload

Leave a Reply

Your email address will not be published. Required fields are marked *

Name *