Visit Sinki.ai for Enterprise Databricks Services | Simplify Your Data Journey
Jellyfish Technologies Logo

LLM-Driven API Orchestration using LangChain + Celery + Redis Queue

prompt-based-control-flow-in-fastapi-using-langchain-expression-language-lcel

Many modern applications need LLMs to decide and trigger backend workflows. But calling APIs directly from an LLM response isn’t scalable when tasks are long-running, asynchronous, or queue-driven.

In this blog, we’ll show how to orchestrate LLM-triggered background APIs using:

  • LangChain for tool-aware agents
  • Celery for background task execution
  • Redis as the task broker
  • FastAPI + WebSocket for real-time feedback

Architecture Overview

User ➝ FastAPI ➝ LangChain Agent ➝ Redis Queue ➝ Celery Task ➝ API Call ➝ DB

Use Case: LLM-Driven Task Selector

User sends a natural language request like:

“Download sales report and notify marketing”

The LLM parses intent → maps to tools → triggers backend workflows using Celery.

Step 1: Install Dependencies

pip install langchain openai celery redis fastapi uvicorn

Create requirements.txt: langchain ,openai , celery ,redis ,fastapi ,uvicorn

Step 2: Define Tools with LangChain Agent

from langchain.agents import tool

@tool
def download_sales_report():
    return "sales-report-task"

@tool
def notify_team(team: str):
    return f"notify-task:{team}"

tools = [download_sales_report, notify_team]

Step 3: LLM Agent Generates Tool Instructions

from langchain.agents import initialize_agent
from langchain.chat_models import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")
agent = initialize_agent(tools, llm, agent="zero-shot-react-description", verbose=True)

result = agent.run("Download sales report and notify marketing team")
# → "sales-report-task", "notify-task:marketing"

Step 4: Enqueue Task to Celery

from celery import Celery
import redis

celery_app = Celery('tasks', broker='redis://localhost:6379/0')

@celery_app.task
def handle_task(task_name):
    print(f"Executing {task_name}")
    # Simulate work (or call your backend service)
    return f"Completed: {task_name}"

# In FastAPI route:
handle_task.delay("sales-report-task")

Step 5: FastAPI + WebSocket UI for Live Updates

from fastapi import FastAPI, WebSocket
import asyncio

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    for i in range(5):
        await asyncio.sleep(1)
        await ws.send_text(f"Progress {i}/5")
    await ws.send_text("Task completed")

Connect this to a frontend using JavaScript:

const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = (msg) => console.log(msg.data);

Full Flow Example

1. User sends natural task ➝ FastAPI

2. FastAPI calls LangChain agent

3. Agent resolves tools → “download-report”, “notify-team”

4. FastAPI sends these tasks to Celery

5. Worker executes APIs in background

6. WebSocket returns live updates

Bonus: Track Task ID + Result

from uuid import uuid4

@celery_app.task
def handle_task(task_name, task_id):
    print(f"Executing {task_name}")
    redis.set(task_id, f"Completed: {task_name}")
    return task_id

Benefits of This Pattern

  • Scalable: Offloads long-running tasks from API
  • Asynchronous: Enables concurrent workflows
  • LLM-driven: Prompts become execution logic
  • Live updates: Improves user feedback experience

Conclusion

LLM + Celery + Redis forms a powerful orchestration stack:

  • LangChain agents parse intent
  • Tasks queue into Celery

WebSocket layer adds interactivity

Share this article
Want to speak with our solution experts?
Jellyfish Technologies

Modernize Legacy System With AI: A Strategy for CEOs

Download the eBook and get insights on CEOs growth strategy

    Let's Talk

    We believe in solving complex business challenges of the converging world, by using cutting-edge technologies.