Learn how to queue OpenAI API requests effectively using rate limiting and queue systems like Redis, RabbitMQ, or AWS SQS to handle high user traffic and avoid rate limit errors.
Book a call with an Expert
Starting a new venture? Need to upgrade your web app? RapidDev builds application with your growth in mind.
To queue OpenAI requests when too many users hit the workflow at once, implement a request rate limiter using techniques like token bucket algorithm, leaky bucket algorithm, or use a queue system like Redis, RabbitMQ, or AWS SQS. This prevents API rate limit errors and ensures efficient processing by managing the flow of requests to stay within OpenAI's rate limits.
Step 1: Understanding OpenAI's Rate Limits
Before implementing a queue system, it's important to understand OpenAI's rate limits. These limits vary depending on your subscription tier and the specific models you're using. Rate limits are typically measured in:
For example, at the time of writing, free tier users might have limits like 3 RPM and 40,000 TPM for GPT-3.5-Turbo, while paid tiers have higher limits.
You can find the current rate limits in your OpenAI account dashboard or in their API documentation. Understanding these limits is crucial for designing an effective queuing system.
Step 2: Choose a Queuing Approach
There are several approaches to implement a queuing system:
For most production applications, Redis or a dedicated message broker is recommended for reliability and scalability.
Step 3: Setting Up a Basic In-Memory Queue with Node.js
Let's start with a simple in-memory queue implementation using Node.js:
const { Configuration, OpenAIApi } = require("openai");
const configuration = new Configuration({
apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);
// Queue implementation
class RequestQueue {
constructor(maxRequestsPerMinute) {
this.queue = [];
this.processing = false;
this.maxRequestsPerMinute = maxRequestsPerMinute;
this.requestsThisMinute = 0;
this.resetTime = Date.now() + 60000; // Reset after a minute
}
async add(prompt, model, res) {
return new Promise((resolve, reject) => {
this.queue.push({
prompt,
model,
res,
resolve,
reject
});
if (!this.processing) {
this.processQueue();
}
});
}
async processQueue() {
if (this.queue.length === 0) {
this.processing = false;
return;
}
this.processing = true;
// Check if we need to reset the counter
if (Date.now() > this.resetTime) {
this.requestsThisMinute = 0;
this.resetTime = Date.now() + 60000;
}
// Check if we've hit the rate limit
if (this.requestsThisMinute >= this.maxRequestsPerMinute) {
const waitTime = this.resetTime - Date.now();
console.log(`Rate limit reached. Waiting ${waitTime}ms before next request.`);
setTimeout(() => this.processQueue(), waitTime);
return;
}
// Process the next item in the queue
const { prompt, model, res, resolve, reject } = this.queue.shift();
this.requestsThisMinute++;
try {
const completion = await openai.createCompletion({
model: model || "text-davinci-003",
prompt: prompt,
max\_tokens: 1000
});
// If you're using this in an Express app, you might want to send the response
if (res) {
res.json({ result: completion.data.choices[0].text });
}
resolve(completion.data.choices[0].text);
} catch (error) {
console.error("Error calling OpenAI API:", error);
if (res) {
res.status(500).json({ error: "Failed to process request" });
}
reject(error);
}
// Process the next item after a small delay to prevent overloading
setTimeout(() => this.processQueue(), 100);
}
}
// Create a queue with a rate limit of 60 requests per minute
const requestQueue = new RequestQueue(60);
// Express route example
app.post('/generate-text', async (req, res) => {
try {
const { prompt, model } = req.body;
await requestQueue.add(prompt, model, res);
// Note: response is handled inside the queue processing
} catch (error) {
console.error("Failed to add request to queue:", error);
res.status(500).json({ error: "Failed to process request" });
}
});
This implementation creates a basic queue that respects rate limits and processes requests in the order they were received.
Step 4: Implementing a Redis-Based Queue
For a more robust solution that works across multiple servers, let's implement a Redis-based queue:
First, install the necessary packages:
npm install redis bull dotenv openai express
Then create a queuing system using Bull:
require('dotenv').config();
const express = require('express');
const Bull = require('bull');
const { Configuration, OpenAIApi } = require("openai");
// Initialize Express
const app = express();
app.use(express.json());
// Initialize OpenAI
const configuration = new Configuration({
apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);
// Create a Bull queue
const openaiQueue = new Bull('openai-requests', {
redis: {
host: process.env.REDIS\_HOST || 'localhost',
port: process.env.REDIS\_PORT || 6379,
password: process.env.REDIS\_PASSWORD
},
limiter: {
max: 60, // Maximum number of jobs processed per minute
duration: 60000 // 1 minute in milliseconds
}
});
// Process the queue
openaiQueue.process(async (job) => {
const { prompt, model, userId } = job.data;
try {
console.log(`Processing job ${job.id} for user ${userId}`);
const completion = await openai.createCompletion({
model: model || "text-davinci-003",
prompt: prompt,
max\_tokens: 1000
});
return { result: completion.data.choices[0].text };
} catch (error) {
console.error(`Error processing job ${job.id}:`, error.message);
throw new Error(`Failed to process OpenAI request: ${error.message}`);
}
});
// Express route to add jobs to the queue
app.post('/api/generate', async (req, res) => {
const { prompt, model, userId } = req.body;
if (!prompt) {
return res.status(400).json({ error: 'Prompt is required' });
}
try {
// Add the job to the queue
const job = await openaiQueue.add({
prompt,
model,
userId: userId || 'anonymous'
}, {
removeOnComplete: true, // Remove jobs from queue when completed
attempts: 3, // Retry failed jobs up to 3 times
backoff: {
type: 'exponential',
delay: 1000 // Initial delay before retrying in milliseconds
}
});
// Return job ID immediately so client can poll for results
res.json({
message: 'Request queued successfully',
jobId: job.id
});
// Alternative: wait for job to complete and return results
// const result = await job.finished();
// res.json(result);
} catch (error) {
console.error('Failed to queue request:', error);
res.status(500).json({ error: 'Failed to queue request' });
}
});
// Route to check job status
app.get('/api/status/:jobId', async (req, res) => {
const { jobId } = req.params;
try {
const job = await openaiQueue.getJob(jobId);
if (!job) {
return res.status(404).json({ error: 'Job not found' });
}
const state = await job.getState();
const result = job.returnvalue;
res.json({
id: job.id,
state,
result: state === 'completed' ? result : null,
reason: job.failedReason
});
} catch (error) {
console.error('Error getting job status:', error);
res.status(500).json({ error: 'Failed to get job status' });
}
});
// Handle queue events for monitoring
openaiQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
});
openaiQueue.on('failed', (job, error) => {
console.error(`Job ${job.id} failed with error:`, error.message);
});
// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
Step 5: Implementing a Client-Side Solution
For client-side implementation, add retry logic and feedback for users:
// HTML template
OpenAI API Client
OpenAI Text Generator
Step 6: Advanced Queue Management with RabbitMQ
For even more scalable queue management, you can use RabbitMQ. First, install the required packages:
npm install amqplib openai express dotenv
Then implement a producer-consumer pattern:
// producer.js - API server that receives client requests
require('dotenv').config();
const express = require('express');
const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');
const app = express();
app.use(express.json());
// Store for mapping correlation IDs to response objects
const pendingRequests = new Map();
// Connect to RabbitMQ
async function connectRabbitMQ() {
try {
const connection = await amqp.connect(process.env.RABBITMQ\_URL || 'amqp://localhost');
const channel = await connection.createChannel();
// Declare the request queue
await channel.assertQueue('openai\_requests', {
durable: true // Queue survives broker restart
});
// Declare the response queue
await channel.assertQueue('openai\_responses', {
durable: true
});
// Set up consumer for responses
await channel.consume('openai\_responses', async (msg) => {
if (msg) {
const correlationId = msg.properties.correlationId;
const responseData = JSON.parse(msg.content.toString());
// Find the pending request
const pendingRequest = pendingRequests.get(correlationId);
if (pendingRequest) {
const { res, timer } = pendingRequest;
clearTimeout(timer); // Clear the timeout
// Send the response to the client
if (responseData.error) {
res.status(500).json({ error: responseData.error });
} else {
res.json(responseData);
}
// Remove from pending requests
pendingRequests.delete(correlationId);
}
channel.ack(msg);
}
});
return channel;
} catch (error) {
console.error('Error connecting to RabbitMQ:', error);
throw error;
}
}
// Initialize RabbitMQ connection
let rabbitChannel;
connectRabbitMQ()
.then(channel => {
rabbitChannel = channel;
console.log('Connected to RabbitMQ');
})
.catch(err => {
console.error('Failed to connect to RabbitMQ:', err);
process.exit(1);
});
// API route to handle text generation requests
app.post('/api/generate', async (req, res) => {
if (!rabbitChannel) {
return res.status(503).json({ error: 'Service unavailable. Queue system not ready.' });
}
const { prompt, model, userId } = req.body;
if (!prompt) {
return res.status(400).json({ error: 'Prompt is required' });
}
try {
// Generate a unique correlation ID for this request
const correlationId = uuidv4();
// Create a timeout to handle cases where no response is received
const timer = setTimeout(() => {
if (pendingRequests.has(correlationId)) {
pendingRequests.get(correlationId).res.status(504).json({
error: 'Request timed out while waiting for a response'
});
pendingRequests.delete(correlationId);
}
}, 30000); // 30 second timeout
// Store the response object and timer for later use
pendingRequests.set(correlationId, { res, timer });
// Publish the request to the queue
rabbitChannel.sendToQueue('openai\_requests',
Buffer.from(JSON.stringify({
prompt,
model: model || 'text-davinci-003',
userId: userId || 'anonymous',
timestamp: Date.now()
})),
{
persistent: true, // Message survives broker restart
correlationId,
replyTo: 'openai\_responses'
}
);
// Note: We don't respond here. The response will be sent
// when we receive a message on the response queue
} catch (error) {
console.error('Error queuing request:', error);
res.status(500).json({ error: 'Failed to queue request' });
}
});
// Health check endpoint
app.get('/health', (req, res) => {
res.json({ status: 'ok', queueConnected: !!rabbitChannel });
});
// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Producer server running on port ${PORT}`);
});
Next, create the consumer to process queued requests:
// consumer.js - Worker that processes OpenAI requests
require('dotenv').config();
const amqp = require('amqplib');
const { Configuration, OpenAIApi } = require("openai");
// Initialize OpenAI
const configuration = new Configuration({
apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);
// Rate limit tracking
const rateLimiter = {
requestsThisMinute: 0,
lastResetTime: Date.now(),
maxRequestsPerMinute: parseInt(process.env.MAX_REQUESTS_PER\_MINUTE || '60'),
canMakeRequest() {
const now = Date.now();
// Reset counter if a minute has passed
if (now - this.lastResetTime > 60000) {
this.requestsThisMinute = 0;
this.lastResetTime = now;
}
return this.requestsThisMinute < this.maxRequestsPerMinute;
},
incrementCounter() {
this.requestsThisMinute++;
}
};
// Connect to RabbitMQ and process messages
async function start() {
try {
// Connect to RabbitMQ
const connection = await amqp.connect(process.env.RABBITMQ\_URL || 'amqp://localhost');
const channel = await connection.createChannel();
// Declare the request queue
await channel.assertQueue('openai\_requests', {
durable: true
});
// Declare the response queue
await channel.assertQueue('openai\_responses', {
durable: true
});
// Set prefetch to limit the number of unacknowledged messages
// This prevents the worker from being overwhelmed
channel.prefetch(1);
console.log('Consumer waiting for messages...');
// Process messages from the queue
await channel.consume('openai\_requests', async (msg) => {
if (msg) {
const { correlationId, replyTo } = msg.properties;
const requestData = JSON.parse(msg.content.toString());
console.log(`Processing request ${correlationId} from ${requestData.userId}`);
try {
// Check rate limits
if (!rateLimiter.canMakeRequest()) {
console.log('Rate limit reached, waiting...');
// Requeue the message with a delay
setTimeout(() => {
channel.nack(msg, false, true);
}, 5000); // Wait 5 seconds before requeuing
return;
}
// Increment the rate limit counter
rateLimiter.incrementCounter();
// Process the OpenAI request
const completion = await openai.createCompletion({
model: requestData.model,
prompt: requestData.prompt,
max\_tokens: 1000
});
// Send the response back
channel.sendToQueue(
replyTo,
Buffer.from(JSON.stringify({
result: completion.data.choices[0].text,
model: requestData.model,
processingTime: Date.now() - requestData.timestamp
})),
{ correlationId }
);
// Acknowledge the message
channel.ack(msg);
} catch (error) {
console.error(`Error processing request ${correlationId}:`, error);
// Send error response
channel.sendToQueue(
replyTo,
Buffer.from(JSON.stringify({
error: `Failed to process request: ${error.message}`,
processingTime: Date.now() - requestData.timestamp
})),
{ correlationId }
);
// Acknowledge the message to remove it from the queue
// In a production system, you might want to handle certain errors differently
channel.ack(msg);
}
}
});
// Handle process termination
process.on('SIGINT', async () => {
await channel.close();
await connection.close();
process.exit(0);
});
} catch (error) {
console.error('Consumer error:', error);
process.exit(1);
}
}
start();
Step 7: Implementing AWS SQS for Cloud-Based Queuing
For AWS-based applications, SQS provides a reliable queuing service:
// Install required packages
// npm install aws-sdk openai express dotenv
// aws-sqs-producer.js
require('dotenv').config();
const express = require('express');
const AWS = require('aws-sdk');
const { v4: uuidv4 } = require('uuid');
// Configure AWS
AWS.config.update({
region: process.env.AWS\_REGION || 'us-east-1',
accessKeyId: process.env.AWS_ACCESS_KEY\_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS\_KEY
});
const sqs = new AWS.SQS();
const requestQueueUrl = process.env.SQS_REQUEST_QUEUE\_URL;
const app = express();
app.use(express.json());
// Route to handle text generation requests
app.post('/api/generate', async (req, res) => {
const { prompt, model, userId } = req.body;
if (!prompt) {
return res.status(400).json({ error: 'Prompt is required' });
}
try {
// Generate a unique ID for this request
const requestId = uuidv4();
// Send message to SQS queue
await sqs.sendMessage({
QueueUrl: requestQueueUrl,
MessageBody: JSON.stringify({
prompt,
model: model || 'text-davinci-003',
userId: userId || 'anonymous',
timestamp: Date.now()
}),
MessageAttributes: {
'RequestId': {
DataType: 'String',
StringValue: requestId
}
},
// MessageGroupId is required for FIFO queues only
// MessageGroupId: userId || 'default'
}).promise();
// Return the request ID to the client for status checking
res.json({
message: 'Request queued successfully',
requestId
});
} catch (error) {
console.error('Error sending message to SQS:', error);
res.status(500).json({ error: 'Failed to queue request' });
}
});
// Route to check status (in a real application, you'd store results in DynamoDB or similar)
app.get('/api/status/:requestId', (req, res) => {
// In a real implementation, you would check a database for the result
res.json({
message: 'Status checking not implemented in this example',
requestId: req.params.requestId
});
});
// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`SQS Producer server running on port ${PORT}`);
});
Now, create the consumer for AWS SQS:
// aws-sqs-consumer.js
require('dotenv').config();
const AWS = require('aws-sdk');
const { Configuration, OpenAIApi } = require("openai");
// Configure AWS
AWS.config.update({
region: process.env.AWS\_REGION || 'us-east-1',
accessKeyId: process.env.AWS_ACCESS_KEY\_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS\_KEY
});
// Initialize AWS services
const sqs = new AWS.SQS();
const dynamodb = new AWS.DynamoDB.DocumentClient();
// Initialize OpenAI
const configuration = new Configuration({
apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);
// Queue URLs
const requestQueueUrl = process.env.SQS_REQUEST_QUEUE\_URL;
// DynamoDB table for storing results
const resultsTable = process.env.DYNAMODB_RESULTS_TABLE;
// Rate limiter
const rateLimiter = {
requestsThisMinute: 0,
lastResetTime: Date.now(),
maxRequestsPerMinute: parseInt(process.env.MAX_REQUESTS_PER\_MINUTE || '60'),
canMakeRequest() {
const now = Date.now();
if (now - this.lastResetTime > 60000) {
this.requestsThisMinute = 0;
this.lastResetTime = now;
}
return this.requestsThisMinute < this.maxRequestsPerMinute;
},
incrementCounter() {
this.requestsThisMinute++;
}
};
// Process messages from SQS
async function processMessages() {
try {
// Receive messages from SQS
const response = await sqs.receiveMessage({
QueueUrl: requestQueueUrl,
MaxNumberOfMessages: 1,
WaitTimeSeconds: 20, // Long polling
MessageAttributeNames: ['All'],
VisibilityTimeout: 30 // 30 seconds
}).promise();
if (!response.Messages || response.Messages.length === 0) {
console.log('No messages received');
return;
}
const message = response.Messages[0];
const receiptHandle = message.ReceiptHandle;
const requestData = JSON.parse(message.Body);
const requestId = message.MessageAttributes.RequestId.StringValue;
console.log(`Processing request ${requestId} from ${requestData.userId}`);
try {
// Check rate limits
if (!rateLimiter.canMakeRequest()) {
console.log('Rate limit reached, extending visibility timeout...');
// Extend the visibility timeout to retry later
await sqs.changeMessageVisibility({
QueueUrl: requestQueueUrl,
ReceiptHandle: receiptHandle,
VisibilityTimeout: 10 // Try again in 10 seconds
}).promise();
return;
}
// Increment rate limit counter
rateLimiter.incrementCounter();
// Process the OpenAI request
const completion = await openai.createCompletion({
model: requestData.model,
prompt: requestData.prompt,
max\_tokens: 1000
});
const result = completion.data.choices[0].text;
// Store the result in DynamoDB
await dynamodb.put({
TableName: resultsTable,
Item: {
requestId,
userId: requestData.userId,
result,
model: requestData.model,
timestamp: Date.now(),
status: 'completed'
}
}).promise();
console.log(`Request ${requestId} processed successfully`);
// Delete the message from the queue
await sqs.deleteMessage({
QueueUrl: requestQueueUrl,
ReceiptHandle: receiptHandle
}).promise();
} catch (error) {
console.error(`Error processing request ${requestId}:`, error);
// Store the error in DynamoDB
await dynamodb.put({
TableName: resultsTable,
Item: {
requestId,
userId: requestData.userId,
error: error.message,
timestamp: Date.now(),
status: 'failed'
}
}).promise();
// Delete the message to prevent retries if this is a permanent error
await sqs.deleteMessage({
QueueUrl: requestQueueUrl,
ReceiptHandle: receiptHandle
}).promise();
}
} catch (error) {
console.error('Error in message processing loop:', error);
}
// Continue processing messages
setImmediate(processMessages);
}
// Start processing messages
console.log('SQS Consumer started');
processMessages();
Step 8: Implementing Rate Limiting with Token Bucket Algorithm
The token bucket algorithm is a more sophisticated approach to rate limiting:
class TokenBucket {
constructor(maxTokens, refillRate) {
this.maxTokens = maxTokens; // Maximum tokens the bucket can hold
this.refillRate = refillRate; // Tokens added per millisecond
this.tokens = maxTokens; // Current token count, start full
this.lastRefillTimestamp = Date.now(); // Last refill timestamp
}
refill() {
const now = Date.now();
const timePassed = now - this.lastRefillTimestamp;
const tokensToAdd = timePassed \* this.refillRate;
if (tokensToAdd > 0) {
this.tokens = Math.min(this.maxTokens, this.tokens + tokensToAdd);
this.lastRefillTimestamp = now;
}
}
take(tokens = 1) {
this.refill();
if (this.tokens >= tokens) {
this.tokens -= tokens;
return true;
}
return false;
}
getWaitTimeInMs(tokens = 1) {
this.refill();
if (this.tokens >= tokens) {
return 0; // No waiting required
}
// Calculate time needed to accumulate required tokens
const tokensNeeded = tokens - this.tokens;
return Math.ceil(tokensNeeded / this.refillRate);
}
}
// Usage example with Express middleware
function createRateLimiterMiddleware(requestsPerMinute, tokensPerRequest = 1) {
// 60000 milliseconds in a minute
const refillRate = requestsPerMinute / 60000;
const bucket = new TokenBucket(requestsPerMinute, refillRate);
return function rateLimiterMiddleware(req, res, next) {
const waitTime = bucket.getWaitTimeInMs(tokensPerRequest);
if (waitTime === 0) {
// Tokens available, consume them
bucket.take(tokensPerRequest);
next();
} else {
// Tokens not available, respond with 429 Too Many Requests
res.status(429).json({
error: 'Too many requests, please try again later',
retryAfter: Math.ceil(waitTime / 1000) // seconds
});
// Alternatively, wait and then process (not recommended for web servers)
// setTimeout(next, waitTime);
}
};
}
// Add the middleware to your Express app
const app = express();
app.use('/api/openai', createRateLimiterMiddleware(60)); // 60 requests per minute
Step 9: Advanced Prioritization and Fair Queuing
Implement priority queuing to handle requests based on user tiers:
// Advanced priority queue implementation
class PriorityQueue {
constructor() {
// Separate queues for different priority levels
this.queues = {
high: [], // Premium users
medium: [], // Standard users
low: [] // Free users
};
// Define weights for fair scheduling
this.weights = {
high: 4, // Process 4 high priority requests
medium: 2, // Then 2 medium priority requests
low: 1 // Then 1 low priority request
};
this.counters = {
high: 0,
medium: 0,
low: 0
};
this.processing = false;
}
add(priority, data) {
return new Promise((resolve, reject) => {
if (!this.queues[priority]) {
return reject(new Error(`Invalid priority: ${priority}`));
}
// Add to appropriate queue
this.queues[priority].push({
data,
resolve,
reject,
addedAt: Date.now()
});
// Start processing if not already running
if (!this.processing) {
this.process();
}
});
}
getNextPriority() {
// Increment counters and determine which queue to process next
const priorities = Object.keys(this.weights);
// First check if any queue is empty and skip it
for (const priority of priorities) {
if (this.queues[priority].length === 0) {
this.counters[priority] = 0;
}
}
// Check counters against weights
for (const priority of priorities) {
if (this.queues[priority].length > 0 &&
this.counters[priority] < this.weights[priority]) {
this.counters[priority]++;
return priority;
}
}
// If all counters have reached their weights, reset them
for (const priority of priorities) {
this.counters[priority] = 0;
}
// Find first non-empty queue
for (const priority of priorities) {
if (this.queues[priority].length > 0) {
this.counters[priority]++;
return priority;
}
}
return null; // All queues are empty
}
async process() {
this.processing = true;
// Get the next priority level to process
const priority = this.getNextPriority();
if (!priority) {
this.processing = false;
return; // No items to process
}
// Get the next item from the queue
const item = this.queues[priority].shift();
try {
// Process the item (e.g., make OpenAI request)
const result = await processOpenAIRequest(item.data);
item.resolve(result);
} catch (error) {
item.reject(error);
}
// Continue processing
setImmediate(() => this.process());
}
// Helper method to get queue statistics
getStats() {
const stats = {};
for (const priority in this.queues) {
stats[priority] = {
length: this.queues[priority].length,
oldestItem: this.queues[priority].length > 0
? Date.now() - this.queues\[priority]\[0].addedAt
: 0
};
}
return stats;
}
}
// Function to process OpenAI request
async function processOpenAIRequest(data) {
// Rate limiting logic
const tokenBucket = new TokenBucket(60, 60/60000); // 60 RPM
if (!tokenBucket.take()) {
const waitTime = tokenBucket.getWaitTimeInMs();
await new Promise(resolve => setTimeout(resolve, waitTime));
}
// Make the actual OpenAI request
const completion = await openai.createCompletion({
model: data.model || "text-davinci-003",
prompt: data.prompt,
max\_tokens: 1000
});
return completion.data.choices[0].text;
}
// Express route using the priority queue
const priorityQueue = new PriorityQueue();
app.post('/api/generate', async (req, res) => {
const { prompt, model, userId, userTier } = req.body;
// Determine priority based on user tier
let priority;
switch (userTier) {
case 'premium':
priority = 'high';
break;
case 'standard':
priority = 'medium';
break;
default:
priority = 'low';
}
try {
// Add to priority queue
const result = await priorityQueue.add(priority, {
prompt,
model,
userId
});
res.json({ result });
} catch (error) {
console.error('Error processing request:', error);
res.status(500).json({ error: error.message });
}
});
// Endpoint to check queue status
app.get('/api/queue-status', (req, res) => {
res.json(priorityQueue.getStats());
});
Step 10: Monitoring and Debugging Your Queue System
Implement monitoring to track queue performance:
const express = require('express');
const prometheus = require('prom-client');
// Create a Registry to register metrics
const register = new prometheus.Registry();
// Add default metrics (GC, memory usage, etc.)
prometheus.collectDefaultMetrics({ register });
// Create custom metrics
const queueSizeGauge = new prometheus.Gauge({
name: 'openai_queue_size',
help: 'Current size of the OpenAI request queue',
labelNames: ['priority']
});
const requestDurationHistogram = new prometheus.Histogram({
name: 'openai_request_duration\_seconds',
help: 'Histogram of OpenAI request processing time',
labelNames: ['model', 'status']
});
const rateLimit = new prometheus.Gauge({
name: 'openai_rate_limit\_remaining',
help: 'Remaining capacity before hitting rate limit'
});
// Register the metrics
register.registerMetric(queueSizeGauge);
register.registerMetric(requestDurationHistogram);
register.registerMetric(rateLimit);
// Middleware to update metrics
function updateMetrics(req, res, next) {
// Record request time
const end = requestDurationHistogram.startTimer();
// Update on response finish
res.on('finish', () => {
end({ model: req.body.model || 'unknown', status: res.statusCode });
});
next();
}
// Setup Express app
const app = express();
app.use(express.json());
app.use('/api', updateMetrics);
// Expose metrics endpoint for Prometheus
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
});
// Example function to update queue metrics
function updateQueueMetrics(queue) {
// Update queue size metrics
for (const priority in queue.queues) {
queueSizeGauge.set({ priority }, queue.queues[priority].length);
}
// Update rate limit metrics
rateLimit.set(rateLimiter.tokens);
}
// Call this function periodically
setInterval(() => {
updateQueueMetrics(priorityQueue);
}, 5000);
// Start server
const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
console.log(`Metrics server running on port ${PORT}`);
});
Step 11: Implementing Graceful Error Handling and Retries
Add robust error handling and retry logic:
// Utility for exponential backoff retries
class RetryHandler {
constructor(maxRetries = 3, initialDelay = 1000, maxDelay = 30000) {
this.maxRetries = maxRetries;
this.initialDelay = initialDelay;
this.maxDelay = maxDelay;
}
async retry(fn, context = {}) {
let lastError;
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
// Try to execute the function
return await fn(attempt);
} catch (error) {
lastError = error;
// Check if we should retry based on the error
if (!this.isRetryable(error) || attempt >= this.maxRetries) {
break;
}
// Calculate delay with exponential backoff and jitter
const delay = Math.min(
this.maxDelay,
this.initialDelay _ Math.pow(2, attempt) _ (0.8 + Math.random() \* 0.4)
);
console.log(`Retry attempt ${attempt + 1}/${this.maxRetries} after ${delay}ms`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
// If we get here, all retries failed
throw lastError;
}
isRetryable(error) {
// Determine if an error should trigger a retry
// Network errors are generally retryable
if (error.code === 'ECONNRESET' ||
error.code === 'ETIMEDOUT' ||
error.code === 'ECONNREFUSED') {
return true;
}
// API rate limits should be retried
if (error.response && error.response.status === 429) {
return true;
}
// Server errors may be retryable
if (error.response && error.response.status >= 500 && error.response.status < 600) {
return true;
}
// Some OpenAI-specific errors are retryable
if (error.response &&
error.response.data &&
error.response.data.error &&
(error.response.data.error.type === 'server\_error' ||
error.response.data.error.type === 'rate_limit_exceeded')) {
return true;
}
return false;
}
}
// Usage example
const retryHandler = new RetryHandler(3, 1000, 30000);
async function processRequestWithRetries(data) {
return await retryHandler.retry(async (attempt) => {
console.log(`Processing request, attempt ${attempt + 1}`);
// OpenAI API call
const completion = await openai.createCompletion({
model: data.model || "text-davinci-003",
prompt: data.prompt,
max\_tokens: 1000
});
return completion.data.choices[0].text;
});
}
// Express route using retry handler
app.post('/api/generate-with-retry', async (req, res) => {
const { prompt, model } = req.body;
if (!prompt) {
return res.status(400).json({ error: 'Prompt is required' });
}
try {
const result = await processRequestWithRetries({
prompt,
model
});
res.json({ result });
} catch (error) {
console.error('Error after all retries:', error);
// Provide helpful error message based on the type of error
if (error.response && error.response.status === 429) {
res.status(429).json({
error: 'Rate limit exceeded. Please try again later.',
retryAfter: error.response.headers['retry-after'] || 60
});
} else if (error.response && error.response.status >= 400) {
res.status(error.response.status).json({
error: error.response.data.error.message || 'API error'
});
} else {
res.status(500).json({ error: 'Failed to process request' });
}
}
});
Step 12: Implementing a Complete End-to-End Solution
Here's a full implementation tying all the concepts together:
// server.js - Main server file
require('dotenv').config();
const express = require('express');
const Bull = require('bull');
const { v4: uuidv4 } = require('uuid');
const { Configuration, OpenAIApi } = require("openai");
const prometheusClient = require('prom-client');
// Initialize Express app
const app = express();
app.use(express.json());
app.use(express.static('public'));
// Set up Prometheus metrics
const register = new prometheusClient.Registry();
prometheusClient.collectDefaultMetrics({ register });
// Create custom metrics
const queueSizeGauge = new prometheusClient.Gauge({
name: 'openai_queue_size',
help: 'Current size of the OpenAI request queue',
labelNames: ['priority']
});
const requestDurationHistogram = new prometheusClient.Histogram({
name: 'openai_request_duration\_seconds',
help: 'Histogram of OpenAI request processing time',
labelNames: ['model', 'status']
});
register.registerMetric(queueSizeGauge);
register.registerMetric(requestDurationHistogram);
// Initialize OpenAI API
const configuration = new Configuration({
apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);
// Create Bull queues for different priorities
const queues = {
high: new Bull('openai-high-priority', {
redis: {
host: process.env.REDIS\_HOST || 'localhost',
port: process.env.REDIS\_PORT || 6379,
password: process.env.REDIS\_PASSWORD
},
limiter: {
max: 40, // Higher throughput for premium users
duration: 60000 // 1 minute
}
}),
standard: new Bull('openai-standard-priority', {
redis: {
host: process.env.REDIS\_HOST || 'localhost',
port: process.env.REDIS\_PORT || 6379,
password: process.env.REDIS\_PASSWORD
},
limiter: {
max: 20, // Standard throughput
duration: 60000
}
}),
low: new Bull('openai-low-priority', {
redis: {
host: process.env.REDIS\_HOST || 'localhost',
port: process.env.REDIS\_PORT || 6379,
password: process.env.REDIS\_PASSWORD
},
limiter: {
max: 10, // Lower throughput for free users
duration: 60000
}
})
};
// Retry utility
class RetryHandler {
constructor(maxRetries = 3, initialDelay = 1000, maxDelay = 30000) {
this.maxRetries = maxRetries;
this.initialDelay = initialDelay;
this.maxDelay = maxDelay;
}
async retry(fn) {
let lastError;
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
return await fn(attempt);
} catch (error) {
lastError = error;
if (!this.isRetryable(error) || attempt >= this.maxRetries) {
break;
}
const delay = Math.min(
this.maxDelay,
this.initialDelay _ Math.pow(2, attempt) _ (0.8 + Math.random() \* 0.4)
);
console.log(`Retry attempt ${attempt + 1}/${this.maxRetries} after ${delay}ms`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw lastError;
}
isRetryable(error) {
// Network errors
if (error.code === 'ECONNRESET' || error.code === 'ETIMEDOUT') {
return true;
}
// Rate limits and server errors
if (error.response && (error.response.status === 429 ||
(error.response.status >= 500 && error.response.status < 600))) {
return true;
}
return false;
}
}
const retryHandler = new RetryHandler();
// Process jobs from each queue
for (const [priority, queue] of Object.entries(queues)) {
queue.process(async (job) => {
const { prompt, model, userId } = job.data;
const startTime = Date.now();
console.log(`Processing ${priority} priority job ${job.id} for user ${userId}`);
try {
const result = await retryHandler.retry(async () => {
const completion = await openai.createCompletion({
model: model || "text-davinci-003",
prompt: prompt,
max\_tokens: 1000
});
return completion.data.choices[0].text;
});
const duration = (Date.now() - startTime) / 1000;
requestDurationHistogram.observe({ model: model || "text-davinci-003", status: 'success' }, duration);
return { result };
} catch (error) {
const duration = (Date.now() - startTime) / 1000;
requestDurationHistogram.observe({ model: model || "text-davinci-003", status: 'error' }, duration);
console.error(`Error processing job ${job.id}:`, error);
throw new Error(`Failed to process OpenAI request: ${error.message}`);
}
});
// Handle completion and failure events
queue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result`);
});
queue.on('failed', (job, error) => {
console.error(`Job ${job.id} failed with error:`, error.message);
});
// Update queue metrics every 5 seconds
setInterval(async () => {
const jobCounts = await queue.getJobCounts();
queueSizeGauge.set({ priority }, jobCounts.waiting + jobCounts.active);
}, 5000);
}
// API routes
app.post('/api/generate', async (req, res) => {
const { prompt, model, userId, userTier } = req.body;
if (!prompt) {
return res.status(400).json({ error: 'Prompt is required' });
}
// Determine queue based on user tier
let queue;
switch (userTier) {
case 'premium':
queue = queues.high;
break;
case 'standard':
queue = queues.standard;
break;
default:
queue = queues.low;
}
try {
// Add job to the appropriate queue
const job = await queue.add({
prompt,
model: model || 'text-davinci-003',
userId: userId || 'anonymous'
}, {
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 100, // Keep last 100 failed jobs
attempts: 3, // Retry up to 3 times
backoff: {
type: 'exponential',
delay: 1000 // Initial delay of 1 second
}
});
// Return the job ID to the client
res.json({
message: 'Request queued successfully',
jobId: job.id,
priority: userTier || 'free'
});
} catch (error) {
console.error('Failed to queue request:', error);
res.status(500).json({ error: 'Failed to queue request' });
}
});
// Get job status
app.get('/api/status/:jobId', async (req, res) => {
const { jobId } = req.params;
const { priority } = req.query;
if (!priority || !queues[priority]) {
return res.status(400).json({ error: 'Valid priority must be specified' });
}
try {
const job = await queues[priority].getJob(jobId);
if (!job) {
return res.status(404).json({ error: 'Job not found' });
}
const state = await job.getState();
const result = job.returnvalue;
res.json({
id: job.id,
state,
result: state === 'completed' ? result : null,
reason: job.failedReason
});
} catch (error) {
console.error('Error getting job status:', error);
res.status(500).json({ error: 'Failed to get job status' });
}
});
// Queue statistics endpoint
app.get('/api/queue-stats', async (req, res) => {
try {
const stats = {};
for (const [priority, queue] of Object.entries(queues)) {
const jobCounts = await queue.getJobCounts();
stats[priority] = jobCounts;
}
res.json(stats);
} catch (error) {
console.error('Error getting queue stats:', error);
res.status(500).json({ error: 'Failed to get queue statistics' });
}
});
// Prometheus metrics endpoint
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
});
// Health check endpoint
app.get('/health', (req, res) => {
res.json({ status: 'ok' });
});
// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
Step 13: Creating a Client-Side Polling System
Implement a client-side solution to check job status:
OpenAI Text Generator
OpenAI Text Generator
Step 14: Deploying Your Queue System
Here's a Docker Compose setup for deploying your queue system:
version: '3.8'
services:
# Redis for queue storage
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
command: redis-server --appendonly yes
restart: unless-stopped
# API server
api-server:
build:
context: .
dockerfile: Dockerfile.server
ports:
- "3000:3000"
environment:
- NODE\_ENV=production
- PORT=3000
- REDIS\_HOST=redis
- REDIS\_PORT=6379
- OPENAI_API_KEY=${OPENAI_API_KEY}
- MAX_REQUESTS_PER\_MINUTE=60
depends\_on:
- redis
restart: unless-stopped
# Worker processes
worker:
build:
context: .
dockerfile: Dockerfile.worker
environment:
- NODE\_ENV=production
- REDIS\_HOST=redis
- REDIS\_PORT=6379
- OPENAI_API_KEY=${OPENAI_API_KEY}
- MAX_REQUESTS_PER\_MINUTE=60
depends\_on:
- redis
restart: unless-stopped
# Scale to multiple workers
deploy:
replicas: 3
# Prometheus for monitoring
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
restart: unless-stopped
# Grafana for dashboards
grafana:
image: grafana/grafana
ports:
- "3001:3000"
volumes:
- grafana-data:/var/lib/grafana
depends\_on:
- prometheus
restart: unless-stopped
volumes:
redis-data:
prometheus-data:
grafana-data:
Create a Dockerfile for the server:
# Dockerfile.server
FROM node:18-alpine
WORKDIR /app
COPY package\*.json ./
RUN npm install --production
COPY . .
EXPOSE 3000
CMD ["node", "server.js"]
Create a Dockerfile for the worker:
# Dockerfile.worker
FROM node:18-alpine
WORKDIR /app
COPY package\*.json ./
RUN npm install --production
COPY . .
CMD ["node", "worker.js"]
Set up Prometheus configuration:
# prometheus.yml
global:
scrape\_interval: 15s
scrape\_configs:
- job\_name: 'openai-queue'
scrape\_interval: 5s
static\_configs:
- targets: ['api-server:3000']
labels:
service: 'api-server'
Step 15: Testing and Optimizing Your Queue System
Create a load testing script to verify your queue works properly:
// load-test.js
const axios = require('axios');
const API\_URL = 'http://localhost:3000/api/generate';
const NUM\_REQUESTS = 100;
const CONCURRENCY = 20;
async function sendRequest(id) {
try {
console.log(`Starting request ${id}`);
const startTime = Date.now();
const response = await axios.post(API\_URL, {
prompt: `Generate a creative story about a ${['robot', 'wizard', 'astronaut', 'detective'][id % 4]} in ${Math.floor(Math.random() * 100)} words.`,
model: 'text-davinci-003',
userTier: \['free', 'standard', 'premium']\[id % 3],
userId: `load-test-user-${id}`
});
const jobId = response.data.jobId;
const priority = response.data.priority;
console.log(`Request ${id} queued with job ID ${jobId} (priority: ${priority})`);
// Poll for results
let completed = false;
let attempts = 0;
while (!completed && attempts < 30) {
await new Promise(resolve => setTimeout(resolve, 2000));
attempts++;
try {
const statusResponse = await axios.get(`http://localhost:3000/api/status/${jobId}?priority=${priority}`);
const data = statusResponse.data;
if (data.state === 'completed') {
const totalTime = Date.now() - startTime;
console.log(`Request ${id} completed in ${totalTime}ms`);
completed = true;
} else if (data.state === 'failed') {
console.error(`Request ${id} failed: ${data.reason}`);
completed = true;
} else {
console.log(`Request ${id} status: ${data.state} (attempt ${attempts})`);
}
} catch (error) {
console.error(`Error checking status for request ${id}:`, error.message);
}
}
if (!completed) {
console.error(`Request ${id} timed out after ${attempts} attempts`);
}
} catch (error) {
console.error(`Error sending request ${id}:`, error.message);
}
}
async function runLoadTest() {
console.log(`Starting load test with ${NUM_REQUESTS} requests (${CONCURRENCY} concurrent)`);
const startTime = Date.now();
const requests = [];
// Start initial batch of concurrent requests
for (let i = 0; i < Math.min(CONCURRENCY, NUM\_REQUESTS); i++) {
requests.push(sendRequest(i));
}
// Add remaining requests as others complete
let completed = 0;
let next = CONCURRENCY;
while (completed < NUM\_REQUESTS) {
await Promise.race(requests);
completed++;
if (next < NUM\_REQUESTS) {
requests.push(sendRequest(next));
next++;
}
}
const totalTime = (Date.now() - startTime) / 1000;
console.log(`Load test completed in ${totalTime} seconds`);
console.log(`Average request rate: ${NUM_REQUESTS / totalTime} req/sec`);
}
runLoadTest().catch(error => {
console.error('Load test failed:', error);
});
Step 16: Handling Large Language Model (LLM) Specific Considerations
For models that require token counting, implement this utility:
// token-counter.js
const { encode } = require('gpt-3-encoder');
class TokenCounter {
constructor(maxTokensPerMinute = 90000) {
this.maxTokensPerMinute = maxTokensPerMinute;
this.tokensThisMinute = 0;
this.lastResetTime = Date.now();
}
resetIfNeeded() {
const now = Date.now();
if (now - this.lastResetTime > 60000) {
console.log(`Resetting token counter. Used ${this.tokensThisMinute} tokens in the last minute.`);
this.tokensThisMinute = 0;
this.lastResetTime = now;
return true;
}
return false;
}
countTokens(text) {
return encode(text).length;
}
estimateTokensForCompletion(prompt, maxTokens) {
// Count prompt tokens
const promptTokens = this.countTokens(prompt);
// Estimate total tokens (prompt + expected completion)
// This is a rough estimate as the actual completion length varies
const estimatedTotalTokens = promptTokens + maxTokens;
return {
promptTokens,
estimatedTotalTokens
};
}
canMakeRequest(estimatedTokens) {
this.resetIfNeeded();
return this.tokensThisMinute + estimatedTokens <= this.maxTokensPerMinute;
}
recordTokenUsage(actualTokens) {
this.resetIfNeeded();
this.tokensThisMinute += actualTokens;
return this.tokensThisMinute;
}
getRemainingTokens() {
this.resetIfNeeded();
return Math.max(0, this.maxTokensPerMinute - this.tokensThisMinute);
}
getEstimatedWaitTime(tokensNeeded) {
if (this.getRemainingTokens() >= tokensNeeded) {
return 0;
}
// Calculate how much time until next reset
const millisTillReset = 60000 - (Date.now() - this.lastResetTime);
// If reset is very soon, just wait for it
if (millisTillReset < 5000) {
return millisTillReset;
}
// Otherwise, calculate wait time based on token rate
return Math.ceil(millisTillReset \* (tokensNeeded / this.maxTokensPerMinute));
}
}
module.exports = TokenCounter;
Integrate the token counter with your queue processor:
// worker-with-token-counting.js
const Bull = require('bull');
const { Configuration, OpenAIApi } = require("openai");
const TokenCounter = require('./token-counter');
// Initialize OpenAI API
const configuration = new Configuration({
apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);
// Initialize token counter (90K tokens per minute is a common limit)
const tokenCounter = new TokenCounter(90000);
// Create queue
const openaiQueue = new Bull('openai-requests', {
redis: {
host: process.env.REDIS\_HOST || 'localhost',
port: process.env.REDIS\_PORT || 6379,
password: process.env.REDIS\_PASSWORD
}
});
// Process jobs
openaiQueue.process(async (job) => {
const { prompt, model, maxTokens = 1000 } = job.data;
// Estimate token usage
const { estimatedTotalTokens } = tokenCounter.estimateTokensForCompletion(prompt, maxTokens);
// Check if we have enough tokens available
if (!tokenCounter.canMakeRequest(estimatedTotalTokens)) {
const waitTime = tokenCounter.getEstimatedWaitTime(estimatedTotalTokens);
if (waitTime > 0) {
console.log(`Token limit reached. Waiting ${waitTime}ms before processing job ${job.id}`);
await new Promise(resolve => setTimeout(resolve, waitTime));
// Recheck after waiting
if (!tokenCounter.canMakeRequest(estimatedTotalTokens)) {
throw new Error('Token limit still exceeded after waiting. Please try again later.');
}
}
}
try {
// Process the OpenAI request
const completion = await openai.createCompletion({
model: model || "text-davinci-003",
prompt: prompt,
max\_tokens: maxTokens
});
const result = completion.data.choices[0].text;
// Record actual token usage
const actualPromptTokens = completion.data.usage.prompt\_tokens;
const actualCompletionTokens = completion.data.usage.completion\_tokens;
const totalTokens = completion.data.usage.total\_tokens;
tokenCounter.recordTokenUsage(totalTokens);
console.log(`Job ${job.id} used ${totalTokens} tokens (${actualPromptTokens} prompt, ${actualCompletionTokens} completion)`);
console.log(`Remaining tokens this minute: ${tokenCounter.getRemainingTokens()}`);
return {
result,
usage: {
promptTokens: actualPromptTokens,
completionTokens: actualCompletionTokens,
totalTokens: totalTokens
}
};
} catch (error) {
console.error(`Error processing job ${job.id}:`, error);
// Check for token-related errors
if (error.response &&
error.response.data &&
error.response.data.error &&
error.response.data.error.type === 'tokens') {
// If the error is token-related, update our counter to reflect the limit
tokenCounter.recordTokenUsage(tokenCounter.maxTokensPerMinute);
}
throw error;
}
});
// Handle events
openaiQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed successfully`);
});
openaiQueue.on('failed', (job, error) => {
console.error(`Job ${job.id} failed with error: ${error.message}`);
});
console.log('Worker started with token counting');
Conclusion
Queuing OpenAI requests is essential when dealing with high-traffic applications. By implementing a robust queue system, you can:
The solution you choose will depend on your specific needs and infrastructure. For simple applications, an in-memory queue might be sufficient. For production systems, Redis-based queues like Bull or cloud services like AWS SQS provide scalability and reliability. By following the steps in this guide, you can build a queue system that efficiently manages OpenAI API requests and provides a smooth experience for your users.
When it comes to serving you, we sweat the little things. That’s why our work makes a big impact.