Middlewares are tools that hooks into Django request and response life cycle. In this tutorial, generated with the assistance of ChatGPT, we will build a middleware for authentication using JWT (Json Web Tokens) and also for SSO (single sign on).
Let us start
Creating the Middleware
import jwt from django.conf import settings from django.http import JsonResponse class JWTAuthenticationMiddleware: def __init__(self, get_response): self.get_response = get_response def __call__(self, request): if request.path.startswith('/admin/'): # Skip authentication for Django admin return self.get_response(request) token = self.get_token_from_request(request) if not token: return self.build_unauthorized_response() try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) except jwt.DecodeError: return self.build_unauthorized_response() # Attach the authenticated user to the request object request.user = payload['user'] response = self.get_response(request) return response def get_token_from_request(self, request): auth_header = request.headers.get('Authorization') if not auth_header: return None parts = auth_header.split() if parts[0].lower() != 'bearer': return None if len(parts) == 1: return None if len(parts) > 2: return None return parts[1] def build_unauthorized_response(self): return JsonResponse({'error': 'Unauthorized'}, status=401)
Here’s how it works:
- The middleware is initialized with a
get_response
function, which represents the next middleware or view in the Django request/response cycle. - In the middleware’s
__call__
method, we first check if the request is for the Django admin site. If so, we skip authentication and pass the request to the next middleware/view. - We then attempt to extract the JWT token from the request. If there is no token or the token is malformed, we return a 401 Unauthorized response.
- If we have a valid token, we use the
jwt.decode
function to decode the payload. We assume that the JWT is signed using the HS256 algorithm and the same secret key as the rest of the Django application (specified in thesettings.SECRET_KEY
variable). - We attach the authenticated user to the
request.user
attribute, which can be accessed by subsequent middlewares/views. - Finally, we pass the request to the next middleware/view and return the response.
To use this middleware, add it to your Django MIDDLEWARE
setting:
MIDDLEWARE = [ # ... 'path.to.JWTAuthenticationMiddleware', ]
Creating A Middleware that Supports SSO
import jwt import requests from django.conf import settings from django.http import JsonResponse class OIDCAuthenticationMiddleware: def __init__(self, get_response): self.get_response = get_response def __call__(self, request): if request.path.startswith('/admin/'): # Skip authentication for Django admin return self.get_response(request) if 'authorization' not in request.headers: return self.build_unauthorized_response() token = self.get_token_from_request(request) if not token: return self.build_unauthorized_response() try: claims = jwt.decode(token, verify=False) except jwt.InvalidTokenError: return self.build_unauthorized_response() userinfo = self.get_userinfo(claims['access_token']) if not userinfo: return self.build_unauthorized_response() # Attach the authenticated user to the request object request.user = userinfo response = self.get_response(request) return response def get_token_from_request(self, request): auth_header = request.headers.get('Authorization') if not auth_header: return None parts = auth_header.split() if parts[0].lower() != 'bearer': return None if len(parts) == 1: return None if len(parts) > 2: return None return parts[1] def get_userinfo(self, access_token): headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } response = requests.get(settings.OIDC_USERINFO_ENDPOINT, headers=headers) if response.status_code != 200: return None return response.json() def build_unauthorized_response(self): return JsonResponse({'error': 'Unauthorized'}, status=401)
One with decode via JWT
Here’s how it works:
- The middleware is initialized with a
get_response
function, which represents the next middleware or view in the Django request/response cycle. - In the middleware’s
__call__
method, we first check if the request is for the Django admin site. If so, we skip authentication and pass the request to the next middleware/view. - We then attempt to extract the OIDC access token from the request headers. If there is no token or the token is malformed, we return a 401 Unauthorized response.
- If we have a valid access token, we use the
jwt.decode
function to decode the claims. We setverify=False
since we’ll verify the token’s signature using the OIDC provider’s JSON Web Key (JWK) later. - We then use the access token to fetch the authenticated user’s userinfo from the OIDC provider’s
/userinfo
endpoint. If the userinfo request fails or returns a non-200 response, we return a 401 Unauthorized response. - Finally, we attach the authenticated user’s userinfo to the
request.user
attribute, which can be accessed by subsequent middlewares/views.
MIDDLEWARE = [ # ... 'path.to.OIDCAuthenticationMiddleware', ]
Make sure to replace path.to
with the actual path to the middleware file. Additionally, you’ll need to set the following settings in your Django settings.py
file:
# OIDC provider configuration OIDC_ISSUER = 'https://accounts.google.com' OIDC_CLIENT_ID = 'your-client-id' OIDC_CLIENT_SECRET = 'your-client-secret' OIDC_REDIRECT_URI = 'http://localhost:8000/oidc/callback' OIDC_AUTHORIZATION_ENDPOINT = f'{OIDC_ISSUER}/o/oauth2/auth' OIDC_TOKEN_ENDPOINT = f'{OIDC_ISSUER}/o/oauth2/token' OID
Unit Test For Our Middleware
import jwt import responses from django.test import TestCase, RequestFactory from django.conf import settings from middleware import JWTAuthenticationMiddleware, OIDCAuthenticationMiddleware class JWTAuthenticationMiddlewareTestCase(TestCase): def setUp(self): self.factory = RequestFactory() self.middleware = JWTAuthenticationMiddleware(lambda request: None) def test_valid_token(self): # Generate a valid JWT token for testing payload = {'sub': 'testuser'} token = jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256') # Build a request with the token in the Authorization header request = self.factory.get('/', HTTP_AUTHORIZATION=f'Bearer {token}') # Call the middleware self.middleware(request) # Check that the request.user attribute was set to the decoded payload self.assertEqual(request.user, payload) def test_missing_token(self): # Build a request without an Authorization header request = self.factory.get('/') # Call the middleware response = self.middleware(request) # Check that the middleware returns a 401 Unauthorized response self.assertEqual(response.status_code, 401) def test_invalid_token(self): # Build a request with an invalid token request = self.factory.get('/', HTTP_AUTHORIZATION='Bearer invalid') # Call the middleware response = self.middleware(request) # Check that the middleware returns a 401 Unauthorized response self.assertEqual(response.status_code, 401) class OIDCAuthenticationMiddlewareTestCase(TestCase): def setUp(self): self.factory = RequestFactory() self.middleware = OIDCAuthenticationMiddleware(lambda request: None) @responses.activate def test_valid_token(self): # Set up a mock response from the OIDC provider's /userinfo endpoint userinfo = {'sub': 'testuser'} responses.add(responses.GET, settings.OIDC_USERINFO_ENDPOINT, json=userinfo) # Generate a valid OIDC access token for testing claims = {'sub': 'testuser', 'iss': settings.OIDC_ISSUER} token = jwt.encode(claims, settings.OIDC_CLIENT_SECRET, algorithm='HS256') # Build a request with the token in the Authorization header request = self.factory.get('/', HTTP_AUTHORIZATION=f'Bearer {token}') # Call the middleware self.middleware(request) # Check that the request.user attribute was set to the userinfo self.assertEqual(request.user, userinfo) def test_missing_token(self): # Build a request without an Authorization header request = self.factory.get('/') # Call the middleware response = self.middleware(request) # Check that the middleware returns a 401 Unauthorized response self.assertEqual(response.status_code, 401) @responses.activate def test_invalid_token(self): # Set up a mock response from the OIDC provider's /userinfo endpoint responses.add(responses.GET, settings.OIDC_USERINFO_ENDPOINT, status=401) # Build a request with an invalid token request = self.factory.get('/', HTTP_AUTHORIZATION='Bearer invalid') # Call the middleware response = self.middleware(request) # Check that the middleware returns a 401 Unauthorized response self.assertEqual(response.status_code, 401)
These tests use the RequestFactory
from Django’s testing framework to create requests with various headers and verify that the middleware behaves correctly. The responses
library is used to mock the OIDC provider’s /userinfo
endpoint and simulate different responses.
INSTALLED_APPS = [ # ... 'rest_framework', 'corsheaders', ] MIDDLEWARE = [ # ... 'corsheaders.middleware.CorsMiddleware', 'django.middleware.common.CommonMiddleware', ] REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': [ 'rest_framework_simplejwt.authentication.JWTAuthentication', ], 'DEFAULT_PERMISSION_CLASSES': [ 'rest_framework.permissions.IsAuthenticated', ], } SIMPLE_JWT = { 'AUTH_HEADER_TYPES': ('Bearer',), 'USER_ID_FIELD': 'sub', 'USER_ID_CLAIM': 'sub', }
Define the Endpoints
Lets Define the endpoints
the API endpoints for the JWT Authentication system with Django Restframework, SimpleJWT, and Single Sign On OpenID Connect:
/api/token
: This endpoint is used to obtain a JWT access token and refresh token by sending a POST request with the user ID and password in the request body. The response will contain the access and refresh tokens./api/hello-world
: This endpoint is used to test the JWT authentication by sending a GET request with the access token in the Authorization header. The response will contain a message with the user ID./oidc/login
: This endpoint initiates the OpenID Connect authentication process by redirecting the user to the authorization server’s login page./oidc/callback
: This endpoint is used by the authorization server to return the authorization code to the client application. The client application can then exchange the authorization code for an access token.
Here’s how the URLs for these API endpoints would look like:
from django.urls import path from .views import * urlpatterns = [ path('api/token', obtain_token, name='token_obtain'), path('api/hello-world', HelloWorldView.as_view(), name='hello_world'), path('oidc/login', oidc_login, name='oidc_login'), path('oidc/callback', oidc_callback, name='oidc_callback'), ]
The content of our views.py will be
from django.contrib.auth import authenticate from django.http import JsonResponse from django.shortcuts import redirect from django.views.decorators.csrf import csrf_exempt from django.views.generic import View from rest_framework import permissions from rest_framework_simplejwt.tokens import RefreshToken from .utils import get_token_data, get_user_info_from_oidc @csrf_exempt def obtain_token(request): """ API endpoint to obtain a JWT access token and refresh token """ if request.method == 'POST': username = request.POST.get('username') password = request.POST.get('password') user = authenticate(username=username, password=password) if user is not None: refresh = RefreshToken.for_user(user) response_data = { 'access': str(refresh.access_token), 'refresh': str(refresh), } return JsonResponse(response_data) else: return JsonResponse({'error': 'Invalid credentials'}, status=400) else: return JsonResponse({'error': 'Invalid request method'}, status=405) class HelloWorldView(View): """ API endpoint to test the JWT authentication """ permission_classes = [permissions.IsAuthenticated] def get(self, request, *args, **kwargs): user_id = get_token_data(request)['user_id'] return JsonResponse({'message': f'Hello World! Your user ID is {user_id}'}) def oidc_login(request): """ Endpoint that initiates the OpenID Connect authentication process """ # TODO: Implement this function return redirect('https://your-auth-server.com/login') def oidc_callback(request): """ Endpoint that handles the OpenID Connect callback and retrieves the access token """ authorization_code = request.GET.get('code') token_data = get_user_info_from_oidc(authorization_code) response_data = { 'access_token': token_data['access_token'], 'id_token': token_data['id_token'] } return JsonResponse(response_data)
Note that the oidc_login
function and get_user_info_from_oidc
function need to be implemented in order to complete the OpenID Connect authentication process. The get_token_data
function is a custom utility function that retrieves the user ID from the JWT access token.
import requests import json def oidc_login(request): """ Endpoint that initiates the OpenID Connect authentication process """ # Configuration variables authorization_endpoint = 'https://your-auth-server.com/authorize' client_id = 'your-client-id' redirect_uri = 'http://localhost:8000/oidc/callback' scope = 'openid profile email' # Build authorization URL params = { 'response_type': 'code', 'client_id': client_id, 'redirect_uri': redirect_uri, 'scope': scope, } auth_url = authorization_endpoint + '?' + '&'.join([f'{k}={v}' for k, v in params.items()]) # Redirect to authorization URL return redirect(auth_url) def get_user_info_from_oidc(authorization_code): """ Function that retrieves the user information from the OpenID Connect server """ # Configuration variables token_endpoint = 'https://your-auth-server.com/token' client_id = 'your-client-id' client_secret = 'your-client-secret' redirect_uri = 'http://localhost:8000/oidc/callback' # Build token request data = { 'grant_type': 'authorization_code', 'code': authorization_code, 'client_id': client_id, 'client_secret': client_secret, 'redirect_uri': redirect_uri, } # Send token request and retrieve access token response = requests.post(token_endpoint, data=data) token_data = json.loads(response.content) # Return token data return token_data
Note that the oidc_login
function uses configuration variables such as the authorization endpoint, client ID, redirect URI, and scope, which should be customized for your specific OpenID Connect authentication setup. Similarly, the get_user_info_from_oidc
function uses the token endpoint, client ID, client secret, redirect URI, and grant type that are specific to your OpenID Connect server.