/n8n-tutorials

How to queue OpenAI requests when too many users hit the workflow at once?

Learn how to queue OpenAI API requests effectively using rate limiting and queue systems like Redis, RabbitMQ, or AWS SQS to handle high user traffic and avoid rate limit errors.

Matt Graham, CEO of Rapid Developers

Book a call with an Expert

Starting a new venture? Need to upgrade your web app? RapidDev builds application with your growth in mind.

Book a free consultation

How to queue OpenAI requests when too many users hit the workflow at once?

To queue OpenAI requests when too many users hit the workflow at once, implement a request rate limiter using techniques like token bucket algorithm, leaky bucket algorithm, or use a queue system like Redis, RabbitMQ, or AWS SQS. This prevents API rate limit errors and ensures efficient processing by managing the flow of requests to stay within OpenAI's rate limits.

 

Step 1: Understanding OpenAI's Rate Limits

 

Before implementing a queue system, it's important to understand OpenAI's rate limits. These limits vary depending on your subscription tier and the specific models you're using. Rate limits are typically measured in:

  • Requests per minute (RPM)
  • Tokens per minute (TPM)

For example, at the time of writing, free tier users might have limits like 3 RPM and 40,000 TPM for GPT-3.5-Turbo, while paid tiers have higher limits.

You can find the current rate limits in your OpenAI account dashboard or in their API documentation. Understanding these limits is crucial for designing an effective queuing system.

 

Step 2: Choose a Queuing Approach

 

There are several approaches to implement a queuing system:

  • In-memory queue: Simple but limited to a single server
  • Redis-based queue: Distributed, fast, and scalable
  • Message brokers: RabbitMQ, AWS SQS, Google Pub/Sub
  • Database-backed queue: PostgreSQL, MongoDB

For most production applications, Redis or a dedicated message broker is recommended for reliability and scalability.

 

Step 3: Setting Up a Basic In-Memory Queue with Node.js

 

Let's start with a simple in-memory queue implementation using Node.js:


const { Configuration, OpenAIApi } = require("openai");
const configuration = new Configuration({
  apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);

// Queue implementation
class RequestQueue {
  constructor(maxRequestsPerMinute) {
    this.queue = [];
    this.processing = false;
    this.maxRequestsPerMinute = maxRequestsPerMinute;
    this.requestsThisMinute = 0;
    this.resetTime = Date.now() + 60000; // Reset after a minute
  }

  async add(prompt, model, res) {
    return new Promise((resolve, reject) => {
      this.queue.push({
        prompt,
        model,
        res,
        resolve,
        reject
      });
      
      if (!this.processing) {
        this.processQueue();
      }
    });
  }

  async processQueue() {
    if (this.queue.length === 0) {
      this.processing = false;
      return;
    }

    this.processing = true;

    // Check if we need to reset the counter
    if (Date.now() > this.resetTime) {
      this.requestsThisMinute = 0;
      this.resetTime = Date.now() + 60000;
    }

    // Check if we've hit the rate limit
    if (this.requestsThisMinute >= this.maxRequestsPerMinute) {
      const waitTime = this.resetTime - Date.now();
      console.log(`Rate limit reached. Waiting ${waitTime}ms before next request.`);
      setTimeout(() => this.processQueue(), waitTime);
      return;
    }

    // Process the next item in the queue
    const { prompt, model, res, resolve, reject } = this.queue.shift();
    this.requestsThisMinute++;

    try {
      const completion = await openai.createCompletion({
        model: model || "text-davinci-003",
        prompt: prompt,
        max\_tokens: 1000
      });
      
      // If you're using this in an Express app, you might want to send the response
      if (res) {
        res.json({ result: completion.data.choices[0].text });
      }
      
      resolve(completion.data.choices[0].text);
    } catch (error) {
      console.error("Error calling OpenAI API:", error);
      if (res) {
        res.status(500).json({ error: "Failed to process request" });
      }
      reject(error);
    }

    // Process the next item after a small delay to prevent overloading
    setTimeout(() => this.processQueue(), 100);
  }
}

// Create a queue with a rate limit of 60 requests per minute
const requestQueue = new RequestQueue(60);

// Express route example
app.post('/generate-text', async (req, res) => {
  try {
    const { prompt, model } = req.body;
    await requestQueue.add(prompt, model, res);
    // Note: response is handled inside the queue processing
  } catch (error) {
    console.error("Failed to add request to queue:", error);
    res.status(500).json({ error: "Failed to process request" });
  }
});

This implementation creates a basic queue that respects rate limits and processes requests in the order they were received.

 

Step 4: Implementing a Redis-Based Queue

 

For a more robust solution that works across multiple servers, let's implement a Redis-based queue:

First, install the necessary packages:


npm install redis bull dotenv openai express

Then create a queuing system using Bull:


require('dotenv').config();
const express = require('express');
const Bull = require('bull');
const { Configuration, OpenAIApi } = require("openai");

// Initialize Express
const app = express();
app.use(express.json());

// Initialize OpenAI
const configuration = new Configuration({
  apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);

// Create a Bull queue
const openaiQueue = new Bull('openai-requests', {
  redis: {
    host: process.env.REDIS\_HOST || 'localhost',
    port: process.env.REDIS\_PORT || 6379,
    password: process.env.REDIS\_PASSWORD
  },
  limiter: {
    max: 60, // Maximum number of jobs processed per minute
    duration: 60000 // 1 minute in milliseconds
  }
});

// Process the queue
openaiQueue.process(async (job) => {
  const { prompt, model, userId } = job.data;
  
  try {
    console.log(`Processing job ${job.id} for user ${userId}`);
    
    const completion = await openai.createCompletion({
      model: model || "text-davinci-003",
      prompt: prompt,
      max\_tokens: 1000
    });
    
    return { result: completion.data.choices[0].text };
  } catch (error) {
    console.error(`Error processing job ${job.id}:`, error.message);
    throw new Error(`Failed to process OpenAI request: ${error.message}`);
  }
});

// Express route to add jobs to the queue
app.post('/api/generate', async (req, res) => {
  const { prompt, model, userId } = req.body;
  
  if (!prompt) {
    return res.status(400).json({ error: 'Prompt is required' });
  }
  
  try {
    // Add the job to the queue
    const job = await openaiQueue.add({
      prompt,
      model,
      userId: userId || 'anonymous'
    }, {
      removeOnComplete: true, // Remove jobs from queue when completed
      attempts: 3, // Retry failed jobs up to 3 times
      backoff: {
        type: 'exponential',
        delay: 1000 // Initial delay before retrying in milliseconds
      }
    });
    
    // Return job ID immediately so client can poll for results
    res.json({ 
      message: 'Request queued successfully', 
      jobId: job.id
    });
    
    // Alternative: wait for job to complete and return results
    // const result = await job.finished();
    // res.json(result);
  } catch (error) {
    console.error('Failed to queue request:', error);
    res.status(500).json({ error: 'Failed to queue request' });
  }
});

// Route to check job status
app.get('/api/status/:jobId', async (req, res) => {
  const { jobId } = req.params;
  
  try {
    const job = await openaiQueue.getJob(jobId);
    
    if (!job) {
      return res.status(404).json({ error: 'Job not found' });
    }
    
    const state = await job.getState();
    const result = job.returnvalue;
    
    res.json({
      id: job.id,
      state,
      result: state === 'completed' ? result : null,
      reason: job.failedReason
    });
  } catch (error) {
    console.error('Error getting job status:', error);
    res.status(500).json({ error: 'Failed to get job status' });
  }
});

// Handle queue events for monitoring
openaiQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

openaiQueue.on('failed', (job, error) => {
  console.error(`Job ${job.id} failed with error:`, error.message);
});

// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server running on port ${PORT}`);
});

 

Step 5: Implementing a Client-Side Solution

 

For client-side implementation, add retry logic and feedback for users:


// HTML template



    
    
    OpenAI API Client
    


    

OpenAI Text Generator

 

Step 6: Advanced Queue Management with RabbitMQ

 

For even more scalable queue management, you can use RabbitMQ. First, install the required packages:


npm install amqplib openai express dotenv

Then implement a producer-consumer pattern:


// producer.js - API server that receives client requests
require('dotenv').config();
const express = require('express');
const amqp = require('amqplib');
const { v4: uuidv4 } = require('uuid');

const app = express();
app.use(express.json());

// Store for mapping correlation IDs to response objects
const pendingRequests = new Map();

// Connect to RabbitMQ
async function connectRabbitMQ() {
  try {
    const connection = await amqp.connect(process.env.RABBITMQ\_URL || 'amqp://localhost');
    const channel = await connection.createChannel();
    
    // Declare the request queue
    await channel.assertQueue('openai\_requests', {
      durable: true // Queue survives broker restart
    });
    
    // Declare the response queue
    await channel.assertQueue('openai\_responses', {
      durable: true
    });
    
    // Set up consumer for responses
    await channel.consume('openai\_responses', async (msg) => {
      if (msg) {
        const correlationId = msg.properties.correlationId;
        const responseData = JSON.parse(msg.content.toString());
        
        // Find the pending request
        const pendingRequest = pendingRequests.get(correlationId);
        if (pendingRequest) {
          const { res, timer } = pendingRequest;
          clearTimeout(timer); // Clear the timeout
          
          // Send the response to the client
          if (responseData.error) {
            res.status(500).json({ error: responseData.error });
          } else {
            res.json(responseData);
          }
          
          // Remove from pending requests
          pendingRequests.delete(correlationId);
        }
        
        channel.ack(msg);
      }
    });
    
    return channel;
  } catch (error) {
    console.error('Error connecting to RabbitMQ:', error);
    throw error;
  }
}

// Initialize RabbitMQ connection
let rabbitChannel;
connectRabbitMQ()
  .then(channel => {
    rabbitChannel = channel;
    console.log('Connected to RabbitMQ');
  })
  .catch(err => {
    console.error('Failed to connect to RabbitMQ:', err);
    process.exit(1);
  });

// API route to handle text generation requests
app.post('/api/generate', async (req, res) => {
  if (!rabbitChannel) {
    return res.status(503).json({ error: 'Service unavailable. Queue system not ready.' });
  }
  
  const { prompt, model, userId } = req.body;
  
  if (!prompt) {
    return res.status(400).json({ error: 'Prompt is required' });
  }
  
  try {
    // Generate a unique correlation ID for this request
    const correlationId = uuidv4();
    
    // Create a timeout to handle cases where no response is received
    const timer = setTimeout(() => {
      if (pendingRequests.has(correlationId)) {
        pendingRequests.get(correlationId).res.status(504).json({ 
          error: 'Request timed out while waiting for a response' 
        });
        pendingRequests.delete(correlationId);
      }
    }, 30000); // 30 second timeout
    
    // Store the response object and timer for later use
    pendingRequests.set(correlationId, { res, timer });
    
    // Publish the request to the queue
    rabbitChannel.sendToQueue('openai\_requests', 
      Buffer.from(JSON.stringify({
        prompt,
        model: model || 'text-davinci-003',
        userId: userId || 'anonymous',
        timestamp: Date.now()
      })),
      {
        persistent: true, // Message survives broker restart
        correlationId,
        replyTo: 'openai\_responses'
      }
    );
    
    // Note: We don't respond here. The response will be sent
    // when we receive a message on the response queue
  } catch (error) {
    console.error('Error queuing request:', error);
    res.status(500).json({ error: 'Failed to queue request' });
  }
});

// Health check endpoint
app.get('/health', (req, res) => {
  res.json({ status: 'ok', queueConnected: !!rabbitChannel });
});

// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Producer server running on port ${PORT}`);
});

Next, create the consumer to process queued requests:


// consumer.js - Worker that processes OpenAI requests
require('dotenv').config();
const amqp = require('amqplib');
const { Configuration, OpenAIApi } = require("openai");

// Initialize OpenAI
const configuration = new Configuration({
  apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);

// Rate limit tracking
const rateLimiter = {
  requestsThisMinute: 0,
  lastResetTime: Date.now(),
  maxRequestsPerMinute: parseInt(process.env.MAX_REQUESTS_PER\_MINUTE || '60'),
  
  canMakeRequest() {
    const now = Date.now();
    
    // Reset counter if a minute has passed
    if (now - this.lastResetTime > 60000) {
      this.requestsThisMinute = 0;
      this.lastResetTime = now;
    }
    
    return this.requestsThisMinute < this.maxRequestsPerMinute;
  },
  
  incrementCounter() {
    this.requestsThisMinute++;
  }
};

// Connect to RabbitMQ and process messages
async function start() {
  try {
    // Connect to RabbitMQ
    const connection = await amqp.connect(process.env.RABBITMQ\_URL || 'amqp://localhost');
    const channel = await connection.createChannel();
    
    // Declare the request queue
    await channel.assertQueue('openai\_requests', {
      durable: true
    });
    
    // Declare the response queue
    await channel.assertQueue('openai\_responses', {
      durable: true
    });
    
    // Set prefetch to limit the number of unacknowledged messages
    // This prevents the worker from being overwhelmed
    channel.prefetch(1);
    
    console.log('Consumer waiting for messages...');
    
    // Process messages from the queue
    await channel.consume('openai\_requests', async (msg) => {
      if (msg) {
        const { correlationId, replyTo } = msg.properties;
        const requestData = JSON.parse(msg.content.toString());
        
        console.log(`Processing request ${correlationId} from ${requestData.userId}`);
        
        try {
          // Check rate limits
          if (!rateLimiter.canMakeRequest()) {
            console.log('Rate limit reached, waiting...');
            
            // Requeue the message with a delay
            setTimeout(() => {
              channel.nack(msg, false, true);
            }, 5000); // Wait 5 seconds before requeuing
            
            return;
          }
          
          // Increment the rate limit counter
          rateLimiter.incrementCounter();
          
          // Process the OpenAI request
          const completion = await openai.createCompletion({
            model: requestData.model,
            prompt: requestData.prompt,
            max\_tokens: 1000
          });
          
          // Send the response back
          channel.sendToQueue(
            replyTo,
            Buffer.from(JSON.stringify({
              result: completion.data.choices[0].text,
              model: requestData.model,
              processingTime: Date.now() - requestData.timestamp
            })),
            { correlationId }
          );
          
          // Acknowledge the message
          channel.ack(msg);
        } catch (error) {
          console.error(`Error processing request ${correlationId}:`, error);
          
          // Send error response
          channel.sendToQueue(
            replyTo,
            Buffer.from(JSON.stringify({
              error: `Failed to process request: ${error.message}`,
              processingTime: Date.now() - requestData.timestamp
            })),
            { correlationId }
          );
          
          // Acknowledge the message to remove it from the queue
          // In a production system, you might want to handle certain errors differently
          channel.ack(msg);
        }
      }
    });
    
    // Handle process termination
    process.on('SIGINT', async () => {
      await channel.close();
      await connection.close();
      process.exit(0);
    });
  } catch (error) {
    console.error('Consumer error:', error);
    process.exit(1);
  }
}

start();

 

Step 7: Implementing AWS SQS for Cloud-Based Queuing

 

For AWS-based applications, SQS provides a reliable queuing service:


// Install required packages
// npm install aws-sdk openai express dotenv

// aws-sqs-producer.js
require('dotenv').config();
const express = require('express');
const AWS = require('aws-sdk');
const { v4: uuidv4 } = require('uuid');

// Configure AWS
AWS.config.update({
  region: process.env.AWS\_REGION || 'us-east-1',
  accessKeyId: process.env.AWS_ACCESS_KEY\_ID,
  secretAccessKey: process.env.AWS_SECRET_ACCESS\_KEY
});

const sqs = new AWS.SQS();
const requestQueueUrl = process.env.SQS_REQUEST_QUEUE\_URL;
const app = express();

app.use(express.json());

// Route to handle text generation requests
app.post('/api/generate', async (req, res) => {
  const { prompt, model, userId } = req.body;
  
  if (!prompt) {
    return res.status(400).json({ error: 'Prompt is required' });
  }
  
  try {
    // Generate a unique ID for this request
    const requestId = uuidv4();
    
    // Send message to SQS queue
    await sqs.sendMessage({
      QueueUrl: requestQueueUrl,
      MessageBody: JSON.stringify({
        prompt,
        model: model || 'text-davinci-003',
        userId: userId || 'anonymous',
        timestamp: Date.now()
      }),
      MessageAttributes: {
        'RequestId': {
          DataType: 'String',
          StringValue: requestId
        }
      },
      // MessageGroupId is required for FIFO queues only
      // MessageGroupId: userId || 'default'
    }).promise();
    
    // Return the request ID to the client for status checking
    res.json({
      message: 'Request queued successfully',
      requestId
    });
  } catch (error) {
    console.error('Error sending message to SQS:', error);
    res.status(500).json({ error: 'Failed to queue request' });
  }
});

// Route to check status (in a real application, you'd store results in DynamoDB or similar)
app.get('/api/status/:requestId', (req, res) => {
  // In a real implementation, you would check a database for the result
  res.json({
    message: 'Status checking not implemented in this example',
    requestId: req.params.requestId
  });
});

// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`SQS Producer server running on port ${PORT}`);
});

Now, create the consumer for AWS SQS:


// aws-sqs-consumer.js
require('dotenv').config();
const AWS = require('aws-sdk');
const { Configuration, OpenAIApi } = require("openai");

// Configure AWS
AWS.config.update({
  region: process.env.AWS\_REGION || 'us-east-1',
  accessKeyId: process.env.AWS_ACCESS_KEY\_ID,
  secretAccessKey: process.env.AWS_SECRET_ACCESS\_KEY
});

// Initialize AWS services
const sqs = new AWS.SQS();
const dynamodb = new AWS.DynamoDB.DocumentClient();

// Initialize OpenAI
const configuration = new Configuration({
  apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);

// Queue URLs
const requestQueueUrl = process.env.SQS_REQUEST_QUEUE\_URL;

// DynamoDB table for storing results
const resultsTable = process.env.DYNAMODB_RESULTS_TABLE;

// Rate limiter
const rateLimiter = {
  requestsThisMinute: 0,
  lastResetTime: Date.now(),
  maxRequestsPerMinute: parseInt(process.env.MAX_REQUESTS_PER\_MINUTE || '60'),
  
  canMakeRequest() {
    const now = Date.now();
    
    if (now - this.lastResetTime > 60000) {
      this.requestsThisMinute = 0;
      this.lastResetTime = now;
    }
    
    return this.requestsThisMinute < this.maxRequestsPerMinute;
  },
  
  incrementCounter() {
    this.requestsThisMinute++;
  }
};

// Process messages from SQS
async function processMessages() {
  try {
    // Receive messages from SQS
    const response = await sqs.receiveMessage({
      QueueUrl: requestQueueUrl,
      MaxNumberOfMessages: 1,
      WaitTimeSeconds: 20, // Long polling
      MessageAttributeNames: ['All'],
      VisibilityTimeout: 30 // 30 seconds
    }).promise();
    
    if (!response.Messages || response.Messages.length === 0) {
      console.log('No messages received');
      return;
    }
    
    const message = response.Messages[0];
    const receiptHandle = message.ReceiptHandle;
    const requestData = JSON.parse(message.Body);
    const requestId = message.MessageAttributes.RequestId.StringValue;
    
    console.log(`Processing request ${requestId} from ${requestData.userId}`);
    
    try {
      // Check rate limits
      if (!rateLimiter.canMakeRequest()) {
        console.log('Rate limit reached, extending visibility timeout...');
        
        // Extend the visibility timeout to retry later
        await sqs.changeMessageVisibility({
          QueueUrl: requestQueueUrl,
          ReceiptHandle: receiptHandle,
          VisibilityTimeout: 10 // Try again in 10 seconds
        }).promise();
        
        return;
      }
      
      // Increment rate limit counter
      rateLimiter.incrementCounter();
      
      // Process the OpenAI request
      const completion = await openai.createCompletion({
        model: requestData.model,
        prompt: requestData.prompt,
        max\_tokens: 1000
      });
      
      const result = completion.data.choices[0].text;
      
      // Store the result in DynamoDB
      await dynamodb.put({
        TableName: resultsTable,
        Item: {
          requestId,
          userId: requestData.userId,
          result,
          model: requestData.model,
          timestamp: Date.now(),
          status: 'completed'
        }
      }).promise();
      
      console.log(`Request ${requestId} processed successfully`);
      
      // Delete the message from the queue
      await sqs.deleteMessage({
        QueueUrl: requestQueueUrl,
        ReceiptHandle: receiptHandle
      }).promise();
    } catch (error) {
      console.error(`Error processing request ${requestId}:`, error);
      
      // Store the error in DynamoDB
      await dynamodb.put({
        TableName: resultsTable,
        Item: {
          requestId,
          userId: requestData.userId,
          error: error.message,
          timestamp: Date.now(),
          status: 'failed'
        }
      }).promise();
      
      // Delete the message to prevent retries if this is a permanent error
      await sqs.deleteMessage({
        QueueUrl: requestQueueUrl,
        ReceiptHandle: receiptHandle
      }).promise();
    }
  } catch (error) {
    console.error('Error in message processing loop:', error);
  }
  
  // Continue processing messages
  setImmediate(processMessages);
}

// Start processing messages
console.log('SQS Consumer started');
processMessages();

 

Step 8: Implementing Rate Limiting with Token Bucket Algorithm

 

The token bucket algorithm is a more sophisticated approach to rate limiting:


class TokenBucket {
  constructor(maxTokens, refillRate) {
    this.maxTokens = maxTokens;       // Maximum tokens the bucket can hold
    this.refillRate = refillRate;      // Tokens added per millisecond
    this.tokens = maxTokens;           // Current token count, start full
    this.lastRefillTimestamp = Date.now();  // Last refill timestamp
  }
  
  refill() {
    const now = Date.now();
    const timePassed = now - this.lastRefillTimestamp;
    const tokensToAdd = timePassed \* this.refillRate;
    
    if (tokensToAdd > 0) {
      this.tokens = Math.min(this.maxTokens, this.tokens + tokensToAdd);
      this.lastRefillTimestamp = now;
    }
  }
  
  take(tokens = 1) {
    this.refill();
    
    if (this.tokens >= tokens) {
      this.tokens -= tokens;
      return true;
    }
    
    return false;
  }
  
  getWaitTimeInMs(tokens = 1) {
    this.refill();
    
    if (this.tokens >= tokens) {
      return 0; // No waiting required
    }
    
    // Calculate time needed to accumulate required tokens
    const tokensNeeded = tokens - this.tokens;
    return Math.ceil(tokensNeeded / this.refillRate);
  }
}

// Usage example with Express middleware
function createRateLimiterMiddleware(requestsPerMinute, tokensPerRequest = 1) {
  // 60000 milliseconds in a minute
  const refillRate = requestsPerMinute / 60000;  
  const bucket = new TokenBucket(requestsPerMinute, refillRate);
  
  return function rateLimiterMiddleware(req, res, next) {
    const waitTime = bucket.getWaitTimeInMs(tokensPerRequest);
    
    if (waitTime === 0) {
      // Tokens available, consume them
      bucket.take(tokensPerRequest);
      next();
    } else {
      // Tokens not available, respond with 429 Too Many Requests
      res.status(429).json({
        error: 'Too many requests, please try again later',
        retryAfter: Math.ceil(waitTime / 1000) // seconds
      });
      
      // Alternatively, wait and then process (not recommended for web servers)
      // setTimeout(next, waitTime);
    }
  };
}

// Add the middleware to your Express app
const app = express();
app.use('/api/openai', createRateLimiterMiddleware(60)); // 60 requests per minute

 

Step 9: Advanced Prioritization and Fair Queuing

 

Implement priority queuing to handle requests based on user tiers:


// Advanced priority queue implementation
class PriorityQueue {
  constructor() {
    // Separate queues for different priority levels
    this.queues = {
      high: [],    // Premium users
      medium: [],  // Standard users
      low: []      // Free users
    };
    
    // Define weights for fair scheduling
    this.weights = {
      high: 4,    // Process 4 high priority requests
      medium: 2,  // Then 2 medium priority requests
      low: 1      // Then 1 low priority request
    };
    
    this.counters = {
      high: 0,
      medium: 0,
      low: 0
    };
    
    this.processing = false;
  }
  
  add(priority, data) {
    return new Promise((resolve, reject) => {
      if (!this.queues[priority]) {
        return reject(new Error(`Invalid priority: ${priority}`));
      }
      
      // Add to appropriate queue
      this.queues[priority].push({
        data,
        resolve,
        reject,
        addedAt: Date.now()
      });
      
      // Start processing if not already running
      if (!this.processing) {
        this.process();
      }
    });
  }
  
  getNextPriority() {
    // Increment counters and determine which queue to process next
    const priorities = Object.keys(this.weights);
    
    // First check if any queue is empty and skip it
    for (const priority of priorities) {
      if (this.queues[priority].length === 0) {
        this.counters[priority] = 0;
      }
    }
    
    // Check counters against weights
    for (const priority of priorities) {
      if (this.queues[priority].length > 0 && 
          this.counters[priority] < this.weights[priority]) {
        this.counters[priority]++;
        return priority;
      }
    }
    
    // If all counters have reached their weights, reset them
    for (const priority of priorities) {
      this.counters[priority] = 0;
    }
    
    // Find first non-empty queue
    for (const priority of priorities) {
      if (this.queues[priority].length > 0) {
        this.counters[priority]++;
        return priority;
      }
    }
    
    return null; // All queues are empty
  }
  
  async process() {
    this.processing = true;
    
    // Get the next priority level to process
    const priority = this.getNextPriority();
    
    if (!priority) {
      this.processing = false;
      return; // No items to process
    }
    
    // Get the next item from the queue
    const item = this.queues[priority].shift();
    
    try {
      // Process the item (e.g., make OpenAI request)
      const result = await processOpenAIRequest(item.data);
      item.resolve(result);
    } catch (error) {
      item.reject(error);
    }
    
    // Continue processing
    setImmediate(() => this.process());
  }
  
  // Helper method to get queue statistics
  getStats() {
    const stats = {};
    
    for (const priority in this.queues) {
      stats[priority] = {
        length: this.queues[priority].length,
        oldestItem: this.queues[priority].length > 0 
          ? Date.now() - this.queues\[priority]\[0].addedAt
          : 0
      };
    }
    
    return stats;
  }
}

// Function to process OpenAI request
async function processOpenAIRequest(data) {
  // Rate limiting logic
  const tokenBucket = new TokenBucket(60, 60/60000); // 60 RPM
  
  if (!tokenBucket.take()) {
    const waitTime = tokenBucket.getWaitTimeInMs();
    await new Promise(resolve => setTimeout(resolve, waitTime));
  }
  
  // Make the actual OpenAI request
  const completion = await openai.createCompletion({
    model: data.model || "text-davinci-003",
    prompt: data.prompt,
    max\_tokens: 1000
  });
  
  return completion.data.choices[0].text;
}

// Express route using the priority queue
const priorityQueue = new PriorityQueue();

app.post('/api/generate', async (req, res) => {
  const { prompt, model, userId, userTier } = req.body;
  
  // Determine priority based on user tier
  let priority;
  switch (userTier) {
    case 'premium':
      priority = 'high';
      break;
    case 'standard':
      priority = 'medium';
      break;
    default:
      priority = 'low';
  }
  
  try {
    // Add to priority queue
    const result = await priorityQueue.add(priority, {
      prompt,
      model,
      userId
    });
    
    res.json({ result });
  } catch (error) {
    console.error('Error processing request:', error);
    res.status(500).json({ error: error.message });
  }
});

// Endpoint to check queue status
app.get('/api/queue-status', (req, res) => {
  res.json(priorityQueue.getStats());
});

 

Step 10: Monitoring and Debugging Your Queue System

 

Implement monitoring to track queue performance:


const express = require('express');
const prometheus = require('prom-client');

// Create a Registry to register metrics
const register = new prometheus.Registry();

// Add default metrics (GC, memory usage, etc.)
prometheus.collectDefaultMetrics({ register });

// Create custom metrics
const queueSizeGauge = new prometheus.Gauge({
  name: 'openai_queue_size',
  help: 'Current size of the OpenAI request queue',
  labelNames: ['priority']
});

const requestDurationHistogram = new prometheus.Histogram({
  name: 'openai_request_duration\_seconds',
  help: 'Histogram of OpenAI request processing time',
  labelNames: ['model', 'status']
});

const rateLimit = new prometheus.Gauge({
  name: 'openai_rate_limit\_remaining',
  help: 'Remaining capacity before hitting rate limit'
});

// Register the metrics
register.registerMetric(queueSizeGauge);
register.registerMetric(requestDurationHistogram);
register.registerMetric(rateLimit);

// Middleware to update metrics
function updateMetrics(req, res, next) {
  // Record request time
  const end = requestDurationHistogram.startTimer();
  
  // Update on response finish
  res.on('finish', () => {
    end({ model: req.body.model || 'unknown', status: res.statusCode });
  });
  
  next();
}

// Setup Express app
const app = express();
app.use(express.json());
app.use('/api', updateMetrics);

// Expose metrics endpoint for Prometheus
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});

// Example function to update queue metrics
function updateQueueMetrics(queue) {
  // Update queue size metrics
  for (const priority in queue.queues) {
    queueSizeGauge.set({ priority }, queue.queues[priority].length);
  }
  
  // Update rate limit metrics
  rateLimit.set(rateLimiter.tokens);
}

// Call this function periodically
setInterval(() => {
  updateQueueMetrics(priorityQueue);
}, 5000);

// Start server
const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
  console.log(`Metrics server running on port ${PORT}`);
});

 

Step 11: Implementing Graceful Error Handling and Retries

 

Add robust error handling and retry logic:


// Utility for exponential backoff retries
class RetryHandler {
  constructor(maxRetries = 3, initialDelay = 1000, maxDelay = 30000) {
    this.maxRetries = maxRetries;
    this.initialDelay = initialDelay;
    this.maxDelay = maxDelay;
  }
  
  async retry(fn, context = {}) {
    let lastError;
    
    for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
      try {
        // Try to execute the function
        return await fn(attempt);
      } catch (error) {
        lastError = error;
        
        // Check if we should retry based on the error
        if (!this.isRetryable(error) || attempt >= this.maxRetries) {
          break;
        }
        
        // Calculate delay with exponential backoff and jitter
        const delay = Math.min(
          this.maxDelay,
          this.initialDelay _ Math.pow(2, attempt) _ (0.8 + Math.random() \* 0.4)
        );
        
        console.log(`Retry attempt ${attempt + 1}/${this.maxRetries} after ${delay}ms`);
        await new Promise(resolve => setTimeout(resolve, delay));
      }
    }
    
    // If we get here, all retries failed
    throw lastError;
  }
  
  isRetryable(error) {
    // Determine if an error should trigger a retry
    
    // Network errors are generally retryable
    if (error.code === 'ECONNRESET' || 
        error.code === 'ETIMEDOUT' || 
        error.code === 'ECONNREFUSED') {
      return true;
    }
    
    // API rate limits should be retried
    if (error.response && error.response.status === 429) {
      return true;
    }
    
    // Server errors may be retryable
    if (error.response && error.response.status >= 500 && error.response.status < 600) {
      return true;
    }
    
    // Some OpenAI-specific errors are retryable
    if (error.response && 
        error.response.data && 
        error.response.data.error && 
        (error.response.data.error.type === 'server\_error' || 
         error.response.data.error.type === 'rate_limit_exceeded')) {
      return true;
    }
    
    return false;
  }
}

// Usage example
const retryHandler = new RetryHandler(3, 1000, 30000);

async function processRequestWithRetries(data) {
  return await retryHandler.retry(async (attempt) => {
    console.log(`Processing request, attempt ${attempt + 1}`);
    
    // OpenAI API call
    const completion = await openai.createCompletion({
      model: data.model || "text-davinci-003",
      prompt: data.prompt,
      max\_tokens: 1000
    });
    
    return completion.data.choices[0].text;
  });
}

// Express route using retry handler
app.post('/api/generate-with-retry', async (req, res) => {
  const { prompt, model } = req.body;
  
  if (!prompt) {
    return res.status(400).json({ error: 'Prompt is required' });
  }
  
  try {
    const result = await processRequestWithRetries({
      prompt,
      model
    });
    
    res.json({ result });
  } catch (error) {
    console.error('Error after all retries:', error);
    
    // Provide helpful error message based on the type of error
    if (error.response && error.response.status === 429) {
      res.status(429).json({ 
        error: 'Rate limit exceeded. Please try again later.',
        retryAfter: error.response.headers['retry-after'] || 60
      });
    } else if (error.response && error.response.status >= 400) {
      res.status(error.response.status).json({ 
        error: error.response.data.error.message || 'API error'
      });
    } else {
      res.status(500).json({ error: 'Failed to process request' });
    }
  }
});

 

Step 12: Implementing a Complete End-to-End Solution

 

Here's a full implementation tying all the concepts together:


// server.js - Main server file
require('dotenv').config();
const express = require('express');
const Bull = require('bull');
const { v4: uuidv4 } = require('uuid');
const { Configuration, OpenAIApi } = require("openai");
const prometheusClient = require('prom-client');

// Initialize Express app
const app = express();
app.use(express.json());
app.use(express.static('public'));

// Set up Prometheus metrics
const register = new prometheusClient.Registry();
prometheusClient.collectDefaultMetrics({ register });

// Create custom metrics
const queueSizeGauge = new prometheusClient.Gauge({
  name: 'openai_queue_size',
  help: 'Current size of the OpenAI request queue',
  labelNames: ['priority']
});

const requestDurationHistogram = new prometheusClient.Histogram({
  name: 'openai_request_duration\_seconds',
  help: 'Histogram of OpenAI request processing time',
  labelNames: ['model', 'status']
});

register.registerMetric(queueSizeGauge);
register.registerMetric(requestDurationHistogram);

// Initialize OpenAI API
const configuration = new Configuration({
  apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);

// Create Bull queues for different priorities
const queues = {
  high: new Bull('openai-high-priority', {
    redis: {
      host: process.env.REDIS\_HOST || 'localhost',
      port: process.env.REDIS\_PORT || 6379,
      password: process.env.REDIS\_PASSWORD
    },
    limiter: {
      max: 40, // Higher throughput for premium users
      duration: 60000 // 1 minute
    }
  }),
  
  standard: new Bull('openai-standard-priority', {
    redis: {
      host: process.env.REDIS\_HOST || 'localhost',
      port: process.env.REDIS\_PORT || 6379,
      password: process.env.REDIS\_PASSWORD
    },
    limiter: {
      max: 20, // Standard throughput
      duration: 60000
    }
  }),
  
  low: new Bull('openai-low-priority', {
    redis: {
      host: process.env.REDIS\_HOST || 'localhost',
      port: process.env.REDIS\_PORT || 6379,
      password: process.env.REDIS\_PASSWORD
    },
    limiter: {
      max: 10, // Lower throughput for free users
      duration: 60000
    }
  })
};

// Retry utility
class RetryHandler {
  constructor(maxRetries = 3, initialDelay = 1000, maxDelay = 30000) {
    this.maxRetries = maxRetries;
    this.initialDelay = initialDelay;
    this.maxDelay = maxDelay;
  }
  
  async retry(fn) {
    let lastError;
    
    for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
      try {
        return await fn(attempt);
      } catch (error) {
        lastError = error;
        
        if (!this.isRetryable(error) || attempt >= this.maxRetries) {
          break;
        }
        
        const delay = Math.min(
          this.maxDelay,
          this.initialDelay _ Math.pow(2, attempt) _ (0.8 + Math.random() \* 0.4)
        );
        
        console.log(`Retry attempt ${attempt + 1}/${this.maxRetries} after ${delay}ms`);
        await new Promise(resolve => setTimeout(resolve, delay));
      }
    }
    
    throw lastError;
  }
  
  isRetryable(error) {
    // Network errors
    if (error.code === 'ECONNRESET' || error.code === 'ETIMEDOUT') {
      return true;
    }
    
    // Rate limits and server errors
    if (error.response && (error.response.status === 429 || 
        (error.response.status >= 500 && error.response.status < 600))) {
      return true;
    }
    
    return false;
  }
}

const retryHandler = new RetryHandler();

// Process jobs from each queue
for (const [priority, queue] of Object.entries(queues)) {
  queue.process(async (job) => {
    const { prompt, model, userId } = job.data;
    const startTime = Date.now();
    
    console.log(`Processing ${priority} priority job ${job.id} for user ${userId}`);
    
    try {
      const result = await retryHandler.retry(async () => {
        const completion = await openai.createCompletion({
          model: model || "text-davinci-003",
          prompt: prompt,
          max\_tokens: 1000
        });
        
        return completion.data.choices[0].text;
      });
      
      const duration = (Date.now() - startTime) / 1000;
      requestDurationHistogram.observe({ model: model || "text-davinci-003", status: 'success' }, duration);
      
      return { result };
    } catch (error) {
      const duration = (Date.now() - startTime) / 1000;
      requestDurationHistogram.observe({ model: model || "text-davinci-003", status: 'error' }, duration);
      
      console.error(`Error processing job ${job.id}:`, error);
      throw new Error(`Failed to process OpenAI request: ${error.message}`);
    }
  });
  
  // Handle completion and failure events
  queue.on('completed', (job, result) => {
    console.log(`Job ${job.id} completed with result`);
  });
  
  queue.on('failed', (job, error) => {
    console.error(`Job ${job.id} failed with error:`, error.message);
  });
  
  // Update queue metrics every 5 seconds
  setInterval(async () => {
    const jobCounts = await queue.getJobCounts();
    queueSizeGauge.set({ priority }, jobCounts.waiting + jobCounts.active);
  }, 5000);
}

// API routes
app.post('/api/generate', async (req, res) => {
  const { prompt, model, userId, userTier } = req.body;
  
  if (!prompt) {
    return res.status(400).json({ error: 'Prompt is required' });
  }
  
  // Determine queue based on user tier
  let queue;
  switch (userTier) {
    case 'premium':
      queue = queues.high;
      break;
    case 'standard':
      queue = queues.standard;
      break;
    default:
      queue = queues.low;
  }
  
  try {
    // Add job to the appropriate queue
    const job = await queue.add({
      prompt,
      model: model || 'text-davinci-003',
      userId: userId || 'anonymous'
    }, {
      removeOnComplete: 100, // Keep last 100 completed jobs
      removeOnFail: 100,     // Keep last 100 failed jobs
      attempts: 3,           // Retry up to 3 times
      backoff: {
        type: 'exponential',
        delay: 1000          // Initial delay of 1 second
      }
    });
    
    // Return the job ID to the client
    res.json({
      message: 'Request queued successfully',
      jobId: job.id,
      priority: userTier || 'free'
    });
  } catch (error) {
    console.error('Failed to queue request:', error);
    res.status(500).json({ error: 'Failed to queue request' });
  }
});

// Get job status
app.get('/api/status/:jobId', async (req, res) => {
  const { jobId } = req.params;
  const { priority } = req.query;
  
  if (!priority || !queues[priority]) {
    return res.status(400).json({ error: 'Valid priority must be specified' });
  }
  
  try {
    const job = await queues[priority].getJob(jobId);
    
    if (!job) {
      return res.status(404).json({ error: 'Job not found' });
    }
    
    const state = await job.getState();
    const result = job.returnvalue;
    
    res.json({
      id: job.id,
      state,
      result: state === 'completed' ? result : null,
      reason: job.failedReason
    });
  } catch (error) {
    console.error('Error getting job status:', error);
    res.status(500).json({ error: 'Failed to get job status' });
  }
});

// Queue statistics endpoint
app.get('/api/queue-stats', async (req, res) => {
  try {
    const stats = {};
    
    for (const [priority, queue] of Object.entries(queues)) {
      const jobCounts = await queue.getJobCounts();
      stats[priority] = jobCounts;
    }
    
    res.json(stats);
  } catch (error) {
    console.error('Error getting queue stats:', error);
    res.status(500).json({ error: 'Failed to get queue statistics' });
  }
});

// Prometheus metrics endpoint
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});

// Health check endpoint
app.get('/health', (req, res) => {
  res.json({ status: 'ok' });
});

// Start the server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server running on port ${PORT}`);
});

 

Step 13: Creating a Client-Side Polling System

 

Implement a client-side solution to check job status:





    
    
    OpenAI Text Generator
    


    

OpenAI Text Generator

 

Step 14: Deploying Your Queue System

 

Here's a Docker Compose setup for deploying your queue system:


version: '3.8'

services:
  # Redis for queue storage
  redis:
    image: redis:alpine
    ports:
    - "6379:6379"
    volumes:
    - redis-data:/data
    command: redis-server --appendonly yes
    restart: unless-stopped
    
  # API server
  api-server:
    build:
      context: .
      dockerfile: Dockerfile.server
    ports:
    - "3000:3000"
    environment:
    - NODE\_ENV=production
    - PORT=3000
    - REDIS\_HOST=redis
    - REDIS\_PORT=6379
    - OPENAI_API_KEY=${OPENAI_API_KEY}
    - MAX_REQUESTS_PER\_MINUTE=60
    depends\_on:
    - redis
    restart: unless-stopped
    
  # Worker processes
  worker:
    build:
      context: .
      dockerfile: Dockerfile.worker
    environment:
    - NODE\_ENV=production
    - REDIS\_HOST=redis
    - REDIS\_PORT=6379
    - OPENAI_API_KEY=${OPENAI_API_KEY}
    - MAX_REQUESTS_PER\_MINUTE=60
    depends\_on:
    - redis
    restart: unless-stopped
    # Scale to multiple workers
    deploy:
      replicas: 3
      
  # Prometheus for monitoring
  prometheus:
    image: prom/prometheus
    ports:
    - "9090:9090"
    volumes:
    - ./prometheus.yml:/etc/prometheus/prometheus.yml
    - prometheus-data:/prometheus
    restart: unless-stopped
    
  # Grafana for dashboards
  grafana:
    image: grafana/grafana
    ports:
    - "3001:3000"
    volumes:
    - grafana-data:/var/lib/grafana
    depends\_on:
    - prometheus
    restart: unless-stopped

volumes:
  redis-data:
  prometheus-data:
  grafana-data:

Create a Dockerfile for the server:


# Dockerfile.server
FROM node:18-alpine

WORKDIR /app

COPY package\*.json ./
RUN npm install --production

COPY . .

EXPOSE 3000

CMD ["node", "server.js"]

Create a Dockerfile for the worker:


# Dockerfile.worker
FROM node:18-alpine

WORKDIR /app

COPY package\*.json ./
RUN npm install --production

COPY . .

CMD ["node", "worker.js"]

Set up Prometheus configuration:


# prometheus.yml
global:
  scrape\_interval: 15s

scrape\_configs:
- job\_name: 'openai-queue'
    scrape\_interval: 5s
    static\_configs:
    - targets: ['api-server:3000']
        labels:
          service: 'api-server'

 

Step 15: Testing and Optimizing Your Queue System

 

Create a load testing script to verify your queue works properly:


// load-test.js
const axios = require('axios');

const API\_URL = 'http://localhost:3000/api/generate';
const NUM\_REQUESTS = 100;
const CONCURRENCY = 20;

async function sendRequest(id) {
  try {
    console.log(`Starting request ${id}`);
    const startTime = Date.now();
    
    const response = await axios.post(API\_URL, {
      prompt: `Generate a creative story about a ${['robot', 'wizard', 'astronaut', 'detective'][id % 4]} in ${Math.floor(Math.random() * 100)} words.`,
      model: 'text-davinci-003',
      userTier: \['free', 'standard', 'premium']\[id % 3],
      userId: `load-test-user-${id}`
    });
    
    const jobId = response.data.jobId;
    const priority = response.data.priority;
    
    console.log(`Request ${id} queued with job ID ${jobId} (priority: ${priority})`);
    
    // Poll for results
    let completed = false;
    let attempts = 0;
    
    while (!completed && attempts < 30) {
      await new Promise(resolve => setTimeout(resolve, 2000));
      attempts++;
      
      try {
        const statusResponse = await axios.get(`http://localhost:3000/api/status/${jobId}?priority=${priority}`);
        const data = statusResponse.data;
        
        if (data.state === 'completed') {
          const totalTime = Date.now() - startTime;
          console.log(`Request ${id} completed in ${totalTime}ms`);
          completed = true;
        } else if (data.state === 'failed') {
          console.error(`Request ${id} failed: ${data.reason}`);
          completed = true;
        } else {
          console.log(`Request ${id} status: ${data.state} (attempt ${attempts})`);
        }
      } catch (error) {
        console.error(`Error checking status for request ${id}:`, error.message);
      }
    }
    
    if (!completed) {
      console.error(`Request ${id} timed out after ${attempts} attempts`);
    }
  } catch (error) {
    console.error(`Error sending request ${id}:`, error.message);
  }
}

async function runLoadTest() {
  console.log(`Starting load test with ${NUM_REQUESTS} requests (${CONCURRENCY} concurrent)`);
  
  const startTime = Date.now();
  const requests = [];
  
  // Start initial batch of concurrent requests
  for (let i = 0; i < Math.min(CONCURRENCY, NUM\_REQUESTS); i++) {
    requests.push(sendRequest(i));
  }
  
  // Add remaining requests as others complete
  let completed = 0;
  let next = CONCURRENCY;
  
  while (completed < NUM\_REQUESTS) {
    await Promise.race(requests);
    completed++;
    
    if (next < NUM\_REQUESTS) {
      requests.push(sendRequest(next));
      next++;
    }
  }
  
  const totalTime = (Date.now() - startTime) / 1000;
  console.log(`Load test completed in ${totalTime} seconds`);
  console.log(`Average request rate: ${NUM_REQUESTS / totalTime} req/sec`);
}

runLoadTest().catch(error => {
  console.error('Load test failed:', error);
});

 

Step 16: Handling Large Language Model (LLM) Specific Considerations

 

For models that require token counting, implement this utility:


// token-counter.js
const { encode } = require('gpt-3-encoder');

class TokenCounter {
  constructor(maxTokensPerMinute = 90000) {
    this.maxTokensPerMinute = maxTokensPerMinute;
    this.tokensThisMinute = 0;
    this.lastResetTime = Date.now();
  }
  
  resetIfNeeded() {
    const now = Date.now();
    if (now - this.lastResetTime > 60000) {
      console.log(`Resetting token counter. Used ${this.tokensThisMinute} tokens in the last minute.`);
      this.tokensThisMinute = 0;
      this.lastResetTime = now;
      return true;
    }
    return false;
  }
  
  countTokens(text) {
    return encode(text).length;
  }
  
  estimateTokensForCompletion(prompt, maxTokens) {
    // Count prompt tokens
    const promptTokens = this.countTokens(prompt);
    
    // Estimate total tokens (prompt + expected completion)
    // This is a rough estimate as the actual completion length varies
    const estimatedTotalTokens = promptTokens + maxTokens;
    
    return {
      promptTokens,
      estimatedTotalTokens
    };
  }
  
  canMakeRequest(estimatedTokens) {
    this.resetIfNeeded();
    return this.tokensThisMinute + estimatedTokens <= this.maxTokensPerMinute;
  }
  
  recordTokenUsage(actualTokens) {
    this.resetIfNeeded();
    this.tokensThisMinute += actualTokens;
    return this.tokensThisMinute;
  }
  
  getRemainingTokens() {
    this.resetIfNeeded();
    return Math.max(0, this.maxTokensPerMinute - this.tokensThisMinute);
  }
  
  getEstimatedWaitTime(tokensNeeded) {
    if (this.getRemainingTokens() >= tokensNeeded) {
      return 0;
    }
    
    // Calculate how much time until next reset
    const millisTillReset = 60000 - (Date.now() - this.lastResetTime);
    
    // If reset is very soon, just wait for it
    if (millisTillReset < 5000) {
      return millisTillReset;
    }
    
    // Otherwise, calculate wait time based on token rate
    return Math.ceil(millisTillReset \* (tokensNeeded / this.maxTokensPerMinute));
  }
}

module.exports = TokenCounter;

Integrate the token counter with your queue processor:


// worker-with-token-counting.js
const Bull = require('bull');
const { Configuration, OpenAIApi } = require("openai");
const TokenCounter = require('./token-counter');

// Initialize OpenAI API
const configuration = new Configuration({
  apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);

// Initialize token counter (90K tokens per minute is a common limit)
const tokenCounter = new TokenCounter(90000);

// Create queue
const openaiQueue = new Bull('openai-requests', {
  redis: {
    host: process.env.REDIS\_HOST || 'localhost',
    port: process.env.REDIS\_PORT || 6379,
    password: process.env.REDIS\_PASSWORD
  }
});

// Process jobs
openaiQueue.process(async (job) => {
  const { prompt, model, maxTokens = 1000 } = job.data;
  
  // Estimate token usage
  const { estimatedTotalTokens } = tokenCounter.estimateTokensForCompletion(prompt, maxTokens);
  
  // Check if we have enough tokens available
  if (!tokenCounter.canMakeRequest(estimatedTotalTokens)) {
    const waitTime = tokenCounter.getEstimatedWaitTime(estimatedTotalTokens);
    
    if (waitTime > 0) {
      console.log(`Token limit reached. Waiting ${waitTime}ms before processing job ${job.id}`);
      await new Promise(resolve => setTimeout(resolve, waitTime));
      
      // Recheck after waiting
      if (!tokenCounter.canMakeRequest(estimatedTotalTokens)) {
        throw new Error('Token limit still exceeded after waiting. Please try again later.');
      }
    }
  }
  
  try {
    // Process the OpenAI request
    const completion = await openai.createCompletion({
      model: model || "text-davinci-003",
      prompt: prompt,
      max\_tokens: maxTokens
    });
    
    const result = completion.data.choices[0].text;
    
    // Record actual token usage
    const actualPromptTokens = completion.data.usage.prompt\_tokens;
    const actualCompletionTokens = completion.data.usage.completion\_tokens;
    const totalTokens = completion.data.usage.total\_tokens;
    
    tokenCounter.recordTokenUsage(totalTokens);
    
    console.log(`Job ${job.id} used ${totalTokens} tokens (${actualPromptTokens} prompt, ${actualCompletionTokens} completion)`);
    console.log(`Remaining tokens this minute: ${tokenCounter.getRemainingTokens()}`);
    
    return { 
      result,
      usage: {
        promptTokens: actualPromptTokens,
        completionTokens: actualCompletionTokens,
        totalTokens: totalTokens
      }
    };
  } catch (error) {
    console.error(`Error processing job ${job.id}:`, error);
    
    // Check for token-related errors
    if (error.response && 
        error.response.data && 
        error.response.data.error && 
        error.response.data.error.type === 'tokens') {
      // If the error is token-related, update our counter to reflect the limit
      tokenCounter.recordTokenUsage(tokenCounter.maxTokensPerMinute);
    }
    
    throw error;
  }
});

// Handle events
openaiQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed successfully`);
});

openaiQueue.on('failed', (job, error) => {
  console.error(`Job ${job.id} failed with error: ${error.message}`);
});

console.log('Worker started with token counting');

 

Conclusion

 

Queuing OpenAI requests is essential when dealing with high-traffic applications. By implementing a robust queue system, you can:

  • Prevent rate limit errors and API failures
  • Provide a better user experience even during peak loads
  • Prioritize requests based on user tiers
  • Handle retries and errors gracefully
  • Monitor your system for optimization

The solution you choose will depend on your specific needs and infrastructure. For simple applications, an in-memory queue might be sufficient. For production systems, Redis-based queues like Bull or cloud services like AWS SQS provide scalability and reliability. By following the steps in this guide, you can build a queue system that efficiently manages OpenAI API requests and provides a smooth experience for your users.

Want to explore opportunities to work with us?

Connect with our team to unlock the full potential of no-code solutions with a no-commitment consultation!

Book a Free Consultation

Client trust and success are our top priorities

When it comes to serving you, we sweat the little things. That’s why our work makes a big impact.

Rapid Dev was an exceptional project management organization and the best development collaborators I've had the pleasure of working with. They do complex work on extremely fast timelines and effectively manage the testing and pre-launch process to deliver the best possible product. I'm extremely impressed with their execution ability.

CPO, Praction - Arkady Sokolov

May 2, 2023

Working with Matt was comparable to having another co-founder on the team, but without the commitment or cost. He has a strategic mindset and willing to change the scope of the project in real time based on the needs of the client. A true strategic thought partner!

Co-Founder, Arc - Donald Muir

Dec 27, 2022

Rapid Dev are 10/10, excellent communicators - the best I've ever encountered in the tech dev space. They always go the extra mile, they genuinely care, they respond quickly, they're flexible, adaptable and their enthusiasm is amazing.

Co-CEO, Grantify - Mat Westergreen-Thorne

Oct 15, 2022

Rapid Dev is an excellent developer for no-code and low-code solutions.
We’ve had great success since launching the platform in November 2023. In a few months, we’ve gained over 1,000 new active users. We’ve also secured several dozen bookings on the platform and seen about 70% new user month-over-month growth since the launch.

Co-Founder, Church Real Estate Marketplace - Emmanuel Brown

May 1, 2024 

Matt’s dedication to executing our vision and his commitment to the project deadline were impressive. 
This was such a specific project, and Matt really delivered. We worked with a really fast turnaround, and he always delivered. The site was a perfect prop for us!

Production Manager, Media Production Company - Samantha Fekete

Sep 23, 2022