Building A Scalable App with FastAPI, Redis Queue and RQ-Dashboard

In today’s data-driven world, the need for efficient and scalable data conversion tools is ever-growing. In this article we will explores how to build a robust JSON to YAML converter using FastAPI, Redis Queue (RQ), and RQ Dashboard. This powerful combination allows for asynchronous processing, job monitoring, and easy scalability.

The Architecture

Our application consists of three main components:

  1. A FastAPI web server that handles HTTP requests
  2. A Redis Queue for managing background jobs
  3. An RQ Dashboard for monitoring job status

Let’s dive into the code and explore each part in detail.

Setting Up the Environment

First, we import the necessary libraries (fastapi,rq,redis,python-multipart,rq-dashboard-fast) and set up our FastAPI application:

from fastapi import FastAPI, UploadFile, HTTPException
from fastapi.responses import StreamingResponse
from redis import Redis
from rq import Queue
import io, yaml, json, uuid
from config import settings
from rq_dashboard_fast import RedisQueueDashboard

app = FastAPI()
redis_url = "redis://localhost:6379"
dashboard = RedisQueueDashboard(redis_url, "/rq")

app.mount("/rq", dashboard)

redis_conn = Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
q = Queue(connection=redis_conn)

Here, we’re initializing our FastAPI app, setting up a Redis connection, and creating a Queue for our background jobs. We’re also mounting the RQ Dashboard to our FastAPI app for easy job monitoring.

The Core Conversion Function

The heart of our application is the json_to_yaml_task function:

def json_to_yaml_task(contents):
    try:
        json_data = json.loads(contents)
        yaml_data = yaml.dump(json_data)
        unique_key = str(uuid.uuid4())
        redis_conn.set(unique_key, yaml_data)
        return {"unique_key": unique_key}
    except Exception as e:
        return {"error": str(e)}

With this function, we will take JSON content, converts it to YAML, stores the result in Redis with a unique key, and returns the key. If an error occurs, it returns an error message.

API Endpoints

We have three main endpoints:

  1. Convert JSON to YAML
@app.post("/api/v1/convert/json")
async def convert_to_yaml(file: UploadFile):
    contents = await file.read()
    job = q.enqueue(json_to_yaml_task, contents)
    return {"job_id": job.id}

This endpoint accepts a JSON file, enqueues a conversion job, and returns a job ID.

  1. Check Job Status
@app.get("/api/v1/job_status/{job_id}")
async def get_job_status(job_id: str):
    job = q.fetch_job(job_id)
    if job is None:
        raise HTTPException(status_code=404, detail="Job not found")
    return {"status": job.get_status(), "result": job.result}

This endpoint allows clients to check the status of a job using its ID.

  1. Download Converted YAML
@app.get("/api/v1/download/{job_id}")
async def download_file(job_id: str):
    job = q.fetch_job(job_id)
    if job is None or job.result is None:
        raise HTTPException(status_code=404, detail="Task not found or not completed")
    unique_key = job.result.get("unique_key")
    if unique_key is None:
        raise HTTPException(status_code=500, detail="Unique key not found in job result")
    yaml_data = redis_conn.get(unique_key)
    if yaml_data is None:
        raise HTTPException(status_code=404, detail="YAML data not found in Redis")
    return StreamingResponse(
        io.BytesIO(yaml_data),
        media_type="application/x-yaml",
        headers={"Content-Disposition": f"attachment; filename=converted.yaml"}
    )

This endpoint retrieves the converted YAML data and returns it as a downloadable file.

We will have to add a worker to manage the queue. This can be achieved using the CLI rq or from a python file `worker.py` as below.

from redis import Redis
from rq import Worker, Queue
from config import settings

# Set up Redis connection
redis_conn = Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)


# Create a queue
q = Queue('default',connection=redis_conn)

if __name__ == '__main__':
    # Create and start the worker
    worker = Worker([q])
    worker.work()

Now we will run the three services – ie the redis server, the fastapi server and the worker.py

# run redis server
redis-server

# run fastapi
uvicorn main:app --port 8000 --reload

# run worker for rq
python worker.py

When a job is added to the queue we get a unique ID and the status of the job

Monitoring the Queue using Rq-Dashboard

To visualize the various status of the jobs, we will be using rq-dashboard. There is a rq-dasboard package for fastapi, which is what we will be using.

The Rq-dashboard offers a similar UI and info like Celery Flower UI

The use of this architecture offers various benefit such as

  1. Scalability: By using Redis Queue, we can easily scale our application to handle a large number of conversion requests.
  2. Asynchronous Processing: Long-running conversions don’t block the web server, improving responsiveness.
  3. Monitoring: The RQ Dashboard provides real-time insights into job statuses and queue health.
  4. Fault Tolerance: Failed jobs can be retried, and the system can recover from crashes without losing data.

This FastAPI application demonstrates a powerful, scalable approach to building a JSON to YAML converter. By leveraging Redis Queue for background processing and RQ Dashboard for monitoring, we’ve created a robust system that can handle high loads and provide real-time feedback to users.

The combination of FastAPI’s speed and ease of use, Redis Queue’s reliability, and RQ Dashboard’s monitoring capabilities makes this architecture suitable for a wide range of data processing applications beyond just JSON to YAML conversion.

As you build and scale your own data processing applications, consider adopting similar patterns to achieve high performance, reliability, and maintainability.

You can also check out the video tutorial below

Happy Coding

Jesus Saves

By Jesse E.Agbe(JCharis)

Leave a Comment

Your email address will not be published. Required fields are marked *