Creating A Django Middleware for JWT Authentication and SSO

Middlewares are tools that hooks into Django request and response life cycle. In this tutorial, generated with the assistance of ChatGPT, we will build a middleware for authentication using JWT (Json Web Tokens) and also for SSO (single sign on).

Let us start

Creating the Middleware

import jwt
from django.conf import settings
from django.http import JsonResponse

class JWTAuthenticationMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        if request.path.startswith('/admin/'):
            # Skip authentication for Django admin
            return self.get_response(request)

        token = self.get_token_from_request(request)
        if not token:
            return self.build_unauthorized_response()

        try:
            payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
        except jwt.DecodeError:
            return self.build_unauthorized_response()

        # Attach the authenticated user to the request object
        request.user = payload['user']

        response = self.get_response(request)

        return response

    def get_token_from_request(self, request):
        auth_header = request.headers.get('Authorization')
        if not auth_header:
            return None

        parts = auth_header.split()
        if parts[0].lower() != 'bearer':
            return None

        if len(parts) == 1:
            return None

        if len(parts) > 2:
            return None

        return parts[1]

    def build_unauthorized_response(self):
        return JsonResponse({'error': 'Unauthorized'}, status=401)

Here’s how it works:

  • The middleware is initialized with a get_response function, which represents the next middleware or view in the Django request/response cycle.
  • In the middleware’s __call__ method, we first check if the request is for the Django admin site. If so, we skip authentication and pass the request to the next middleware/view.
  • We then attempt to extract the JWT token from the request. If there is no token or the token is malformed, we return a 401 Unauthorized response.
  • If we have a valid token, we use the jwt.decode function to decode the payload. We assume that the JWT is signed using the HS256 algorithm and the same secret key as the rest of the Django application (specified in the settings.SECRET_KEY variable).
  • We attach the authenticated user to the request.user attribute, which can be accessed by subsequent middlewares/views.
  • Finally, we pass the request to the next middleware/view and return the response.

To use this middleware, add it to your Django MIDDLEWARE setting:

MIDDLEWARE = [
    # ...
    'path.to.JWTAuthenticationMiddleware',
]

Creating A Middleware that Supports SSO

import jwt
import requests
from django.conf import settings
from django.http import JsonResponse

class OIDCAuthenticationMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        if request.path.startswith('/admin/'):
            # Skip authentication for Django admin
            return self.get_response(request)

        if 'authorization' not in request.headers:
            return self.build_unauthorized_response()

        token = self.get_token_from_request(request)
        if not token:
            return self.build_unauthorized_response()

        try:
            claims = jwt.decode(token, verify=False)
        except jwt.InvalidTokenError:
            return self.build_unauthorized_response()

        userinfo = self.get_userinfo(claims['access_token'])
        if not userinfo:
            return self.build_unauthorized_response()

        # Attach the authenticated user to the request object
        request.user = userinfo

        response = self.get_response(request)

        return response

    def get_token_from_request(self, request):
        auth_header = request.headers.get('Authorization')
        if not auth_header:
            return None

        parts = auth_header.split()
        if parts[0].lower() != 'bearer':
            return None

        if len(parts) == 1:
            return None

        if len(parts) > 2:
            return None

        return parts[1]

    def get_userinfo(self, access_token):
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json'
        }
        response = requests.get(settings.OIDC_USERINFO_ENDPOINT, headers=headers)
        if response.status_code != 200:
            return None
        return response.json()

    def build_unauthorized_response(self):
        return JsonResponse({'error': 'Unauthorized'}, status=401)

One with decode via JWT

Here’s how it works:

  • The middleware is initialized with a get_response function, which represents the next middleware or view in the Django request/response cycle.
  • In the middleware’s __call__ method, we first check if the request is for the Django admin site. If so, we skip authentication and pass the request to the next middleware/view.
  • We then attempt to extract the OIDC access token from the request headers. If there is no token or the token is malformed, we return a 401 Unauthorized response.
  • If we have a valid access token, we use the jwt.decode function to decode the claims. We set verify=False since we’ll verify the token’s signature using the OIDC provider’s JSON Web Key (JWK) later.
  • We then use the access token to fetch the authenticated user’s userinfo from the OIDC provider’s /userinfo endpoint. If the userinfo request fails or returns a non-200 response, we return a 401 Unauthorized response.
  • Finally, we attach the authenticated user’s userinfo to the request.user attribute, which can be accessed by subsequent middlewares/views.
MIDDLEWARE = [
    # ...
    'path.to.OIDCAuthenticationMiddleware',
]

Make sure to replace path.to with the actual path to the middleware file. Additionally, you’ll need to set the following settings in your Django settings.py file:

# OIDC provider configuration

OIDC_ISSUER = 'https://accounts.google.com'
OIDC_CLIENT_ID = 'your-client-id'
OIDC_CLIENT_SECRET = 'your-client-secret'
OIDC_REDIRECT_URI = 'http://localhost:8000/oidc/callback'
OIDC_AUTHORIZATION_ENDPOINT = f'{OIDC_ISSUER}/o/oauth2/auth'
OIDC_TOKEN_ENDPOINT = f'{OIDC_ISSUER}/o/oauth2/token'
OID

Unit Test For Our Middleware

import jwt
import responses
from django.test import TestCase, RequestFactory
from django.conf import settings
from middleware import JWTAuthenticationMiddleware, OIDCAuthenticationMiddleware

class JWTAuthenticationMiddlewareTestCase(TestCase):
    def setUp(self):
        self.factory = RequestFactory()
        self.middleware = JWTAuthenticationMiddleware(lambda request: None)

    def test_valid_token(self):
        # Generate a valid JWT token for testing
        payload = {'sub': 'testuser'}
        token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')

        # Build a request with the token in the Authorization header
        request = self.factory.get('/', HTTP_AUTHORIZATION=f'Bearer {token}')

        # Call the middleware
        self.middleware(request)

        # Check that the request.user attribute was set to the decoded payload
        self.assertEqual(request.user, payload)

    def test_missing_token(self):
        # Build a request without an Authorization header
        request = self.factory.get('/')

        # Call the middleware
        response = self.middleware(request)

        # Check that the middleware returns a 401 Unauthorized response
        self.assertEqual(response.status_code, 401)

    def test_invalid_token(self):
        # Build a request with an invalid token
        request = self.factory.get('/', HTTP_AUTHORIZATION='Bearer invalid')

        # Call the middleware
        response = self.middleware(request)

        # Check that the middleware returns a 401 Unauthorized response
        self.assertEqual(response.status_code, 401)


class OIDCAuthenticationMiddlewareTestCase(TestCase):
    def setUp(self):
        self.factory = RequestFactory()
        self.middleware = OIDCAuthenticationMiddleware(lambda request: None)

    @responses.activate
    def test_valid_token(self):
        # Set up a mock response from the OIDC provider's /userinfo endpoint
        userinfo = {'sub': 'testuser'}
        responses.add(responses.GET, settings.OIDC_USERINFO_ENDPOINT, json=userinfo)

        # Generate a valid OIDC access token for testing
        claims = {'sub': 'testuser', 'iss': settings.OIDC_ISSUER}
        token = jwt.encode(claims, settings.OIDC_CLIENT_SECRET, algorithm='HS256')

        # Build a request with the token in the Authorization header
        request = self.factory.get('/', HTTP_AUTHORIZATION=f'Bearer {token}')

        # Call the middleware
        self.middleware(request)

        # Check that the request.user attribute was set to the userinfo
        self.assertEqual(request.user, userinfo)

    def test_missing_token(self):
        # Build a request without an Authorization header
        request = self.factory.get('/')

        # Call the middleware
        response = self.middleware(request)

        # Check that the middleware returns a 401 Unauthorized response
        self.assertEqual(response.status_code, 401)

    @responses.activate
    def test_invalid_token(self):
        # Set up a mock response from the OIDC provider's /userinfo endpoint
        responses.add(responses.GET, settings.OIDC_USERINFO_ENDPOINT, status=401)

        # Build a request with an invalid token
        request = self.factory.get('/', HTTP_AUTHORIZATION='Bearer invalid')

        # Call the middleware
        response = self.middleware(request)

        # Check that the middleware returns a 401 Unauthorized response
        self.assertEqual(response.status_code, 401)

These tests use the RequestFactory from Django’s testing framework to create requests with various headers and verify that the middleware behaves correctly. The responses library is used to mock the OIDC provider’s /userinfo endpoint and simulate different responses.

INSTALLED_APPS = [
    # ...
    'rest_framework',
    'corsheaders',
]

MIDDLEWARE = [
    # ...
    'corsheaders.middleware.CorsMiddleware',
    'django.middleware.common.CommonMiddleware',
]

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework_simplejwt.authentication.JWTAuthentication',
    ],
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.IsAuthenticated',
    ],
}

SIMPLE_JWT = {
    'AUTH_HEADER_TYPES': ('Bearer',),
    'USER_ID_FIELD': 'sub',
    'USER_ID_CLAIM': 'sub',
}

Define the Endpoints

Lets Define the endpoints

the API endpoints for the JWT Authentication system with Django Restframework, SimpleJWT, and Single Sign On OpenID Connect:

  1. /api/token: This endpoint is used to obtain a JWT access token and refresh token by sending a POST request with the user ID and password in the request body. The response will contain the access and refresh tokens.
  2. /api/hello-world: This endpoint is used to test the JWT authentication by sending a GET request with the access token in the Authorization header. The response will contain a message with the user ID.
  3. /oidc/login: This endpoint initiates the OpenID Connect authentication process by redirecting the user to the authorization server’s login page.
  4. /oidc/callback: This endpoint is used by the authorization server to return the authorization code to the client application. The client application can then exchange the authorization code for an access token.

Here’s how the URLs for these API endpoints would look like:

from django.urls import path
from .views import *

urlpatterns = [
    path('api/token', obtain_token, name='token_obtain'),
    path('api/hello-world', HelloWorldView.as_view(), name='hello_world'),
    path('oidc/login', oidc_login, name='oidc_login'),
    path('oidc/callback', oidc_callback, name='oidc_callback'),
]

The content of our views.py will be

from django.contrib.auth import authenticate
from django.http import JsonResponse
from django.shortcuts import redirect
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import View
from rest_framework import permissions
from rest_framework_simplejwt.tokens import RefreshToken
from .utils import get_token_data, get_user_info_from_oidc

@csrf_exempt
def obtain_token(request):
    """
    API endpoint to obtain a JWT access token and refresh token
    """
    if request.method == 'POST':
        username = request.POST.get('username')
        password = request.POST.get('password')
        user = authenticate(username=username, password=password)
        if user is not None:
            refresh = RefreshToken.for_user(user)
            response_data = {
                'access': str(refresh.access_token),
                'refresh': str(refresh),
            }
            return JsonResponse(response_data)
        else:
            return JsonResponse({'error': 'Invalid credentials'}, status=400)
    else:
        return JsonResponse({'error': 'Invalid request method'}, status=405)

class HelloWorldView(View):
    """
    API endpoint to test the JWT authentication
    """
    permission_classes = [permissions.IsAuthenticated]

    def get(self, request, *args, **kwargs):
        user_id = get_token_data(request)['user_id']
        return JsonResponse({'message': f'Hello World! Your user ID is {user_id}'})

def oidc_login(request):
    """
    Endpoint that initiates the OpenID Connect authentication process
    """
    # TODO: Implement this function
    return redirect('https://your-auth-server.com/login')

def oidc_callback(request):
    """
    Endpoint that handles the OpenID Connect callback and retrieves the access token
    """
    authorization_code = request.GET.get('code')
    token_data = get_user_info_from_oidc(authorization_code)
    response_data = {
        'access_token': token_data['access_token'],
        'id_token': token_data['id_token']
    }
    return JsonResponse(response_data)

Note that the oidc_login function and get_user_info_from_oidc function need to be implemented in order to complete the OpenID Connect authentication process. The get_token_data function is a custom utility function that retrieves the user ID from the JWT access token.

import requests
import json

def oidc_login(request):
    """
    Endpoint that initiates the OpenID Connect authentication process
    """
    # Configuration variables
    authorization_endpoint = 'https://your-auth-server.com/authorize'
    client_id = 'your-client-id'
    redirect_uri = 'http://localhost:8000/oidc/callback'
    scope = 'openid profile email'

    # Build authorization URL
    params = {
        'response_type': 'code',
        'client_id': client_id,
        'redirect_uri': redirect_uri,
        'scope': scope,
    }
    auth_url = authorization_endpoint + '?' + '&'.join([f'{k}={v}' for k, v in params.items()])

    # Redirect to authorization URL
    return redirect(auth_url)

def get_user_info_from_oidc(authorization_code):
    """
    Function that retrieves the user information from the OpenID Connect server
    """
    # Configuration variables
    token_endpoint = 'https://your-auth-server.com/token'
    client_id = 'your-client-id'
    client_secret = 'your-client-secret'
    redirect_uri = 'http://localhost:8000/oidc/callback'

    # Build token request
    data = {
        'grant_type': 'authorization_code',
        'code': authorization_code,
        'client_id': client_id,
        'client_secret': client_secret,
        'redirect_uri': redirect_uri,
    }

    # Send token request and retrieve access token
    response = requests.post(token_endpoint, data=data)
    token_data = json.loads(response.content)

    # Return token data
    return token_data

Note that the oidc_login function uses configuration variables such as the authorization endpoint, client ID, redirect URI, and scope, which should be customized for your specific OpenID Connect authentication setup. Similarly, the get_user_info_from_oidc function uses the token endpoint, client ID, client secret, redirect URI, and grant type that are specific to your OpenID Connect server.

Thanks For Your Attention
Jesus Saves

Leave a Comment

Your email address will not be published. Required fields are marked *