Many modern applications need LLMs to decide and trigger backend workflows. But calling APIs directly from an LLM response isn’t scalable when tasks are long-running, asynchronous, or queue-driven.
In this blog, we’ll show how to orchestrate LLM-triggered background APIs using:
- LangChain for tool-aware agents
- Celery for background task execution
- Redis as the task broker
- FastAPI + WebSocket for real-time feedback
Architecture Overview
User ➝ FastAPI ➝ LangChain Agent ➝ Redis Queue ➝ Celery Task ➝ API Call ➝ DB
Use Case: LLM-Driven Task Selector
User sends a natural language request like:
“Download sales report and notify marketing”
The LLM parses intent → maps to tools → triggers backend workflows using Celery.
Step 1: Install Dependencies
pip install langchain openai celery redis fastapi uvicorn
Create requirements.txt: langchain ,openai , celery ,redis ,fastapi ,uvicorn
Step 2: Define Tools with LangChain Agent
from langchain.agents import tool
@tool
def download_sales_report():
return "sales-report-task"
@tool
def notify_team(team: str):
return f"notify-task:{team}"
tools = [download_sales_report, notify_team]
Step 3: LLM Agent Generates Tool Instructions
from langchain.agents import initialize_agent
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
agent = initialize_agent(tools, llm, agent="zero-shot-react-description", verbose=True)
result = agent.run("Download sales report and notify marketing team")
# → "sales-report-task", "notify-task:marketing"
Step 4: Enqueue Task to Celery
from celery import Celery
import redis
celery_app = Celery('tasks', broker='redis://localhost:6379/0')
@celery_app.task
def handle_task(task_name):
print(f"Executing {task_name}")
# Simulate work (or call your backend service)
return f"Completed: {task_name}"
# In FastAPI route:
handle_task.delay("sales-report-task")
Step 5: FastAPI + WebSocket UI for Live Updates
from fastapi import FastAPI, WebSocket
import asyncio
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
for i in range(5):
await asyncio.sleep(1)
await ws.send_text(f"Progress {i}/5")
await ws.send_text("Task completed")
Connect this to a frontend using JavaScript:
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = (msg) => console.log(msg.data);
Full Flow Example
1. User sends natural task ➝ FastAPI
2. FastAPI calls LangChain agent
3. Agent resolves tools → “download-report”, “notify-team”
4. FastAPI sends these tasks to Celery
5. Worker executes APIs in background
6. WebSocket returns live updates
Bonus: Track Task ID + Result
from uuid import uuid4
@celery_app.task
def handle_task(task_name, task_id):
print(f"Executing {task_name}")
redis.set(task_id, f"Completed: {task_name}")
return task_id
Benefits of This Pattern
- Scalable: Offloads long-running tasks from API
- Asynchronous: Enables concurrent workflows
- LLM-driven: Prompts become execution logic
- Live updates: Improves user feedback experience
Conclusion
LLM + Celery + Redis forms a powerful orchestration stack:
- LangChain agents parse intent
- Tasks queue into Celery
WebSocket layer adds interactivity
