Learn how to handle concurrency issues with parallel LLM calls in n8n using rate limiting, error handling, batching, and asynchronous workflows to optimize performance and reliability.
Book a call with an Expert
Starting a new venture? Need to upgrade your web app? RapidDev builds application with your growth in mind.
When handling concurrency issues with parallel LLM calls in n8n, you need to implement appropriate rate limiting, error handling, and asynchronous execution strategies. This prevents overwhelming the LLM API, manages token quotas effectively, and ensures reliable workflow execution even when processing multiple requests simultaneously.
Comprehensive Guide to Handling Concurrency Issues with Parallel LLM Calls in n8n
Step 1: Understanding the Concurrency Challenges with LLM APIs
Before implementing solutions, it's important to understand the specific concurrency challenges when working with LLM APIs in n8n:
Most LLM providers have specific rate limits, such as OpenAI's default tier allowing 3 requests per minute for GPT-4 or 3500 requests per minute for GPT-3.5. Exceeding these limits results in 429 Too Many Requests errors that can break your workflow.
Step 2: Setting Up the Environment in n8n
To properly handle concurrent LLM calls, first ensure your n8n environment is properly configured:
Navigate to Settings → Executions in n8n and adjust these parameters:
Step 3: Implementing Rate Limiting with the Split In Batches Node
The Split In Batches node is crucial for controlling the flow of requests to LLM APIs:
// Example workflow structure
// Start Node → Split In Batches → OpenAI Node → Merge Node → End Node
Configure the Split In Batches node:
This configuration ensures you stay within rate limits by processing items in controlled batches with appropriate intervals.
Step 4: Using Function Nodes for Advanced Rate Control
For more sophisticated rate limiting, implement a Function node with dynamic delays:
// Function node before LLM API calls
// Implements token bucket algorithm for rate limiting
const items = $input.all();
const rateLimit = 3; // Requests per minute
const interval = 60000 / rateLimit; // Time between requests in ms
// Return items with delays
return items.map((item, index) => {
const delay = index \* interval;
return {
json: {
...item.json,
delay,
scheduledTime: new Date(Date.now() + delay).toISOString()
},
pairedItem: item.pairedItem,
};
});
After this node, add a "Wait" node that uses the dynamic delay value from each item.
Step 5: Implementing Error Handling and Retries
Robust error handling is essential for parallel LLM calls. Implement the following approach:
Function node for error handling with retry logic:
// Add this Function node after your LLM node
const items = $input.all();
const results = [];
for (const item of items) {
try {
// If this is a retry, check how many attempts we've made
const retryCount = item.json.retryCount || 0;
// Check if the item contains an error (e.g., rate limit exceeded)
if (item.json.error && item.json.error.includes('rate_limit_exceeded')) {
// Implement exponential backoff
const backoffTime = Math.pow(2, retryCount) \* 1000; // 1s, 2s, 4s, 8s...
if (retryCount < 5) { // Maximum 5 retry attempts
// Schedule for retry
results.push({
json: {
...item.json,
retryCount: retryCount + 1,
retryAfter: backoffTime,
status: 'scheduled\_retry'
},
pairedItem: item.pairedItem
});
continue;
}
}
// No error or max retries exceeded, pass through the item
results.push(item);
} catch (error) {
// Handle unexpected errors
results.push({
json: {
...item.json,
error: error.message,
status: 'error'
},
pairedItem: item.pairedItem
});
}
}
return results;
Step 6: Creating a Wait-Retry Loop for Failed Requests
For items that need to be retried, implement a loop structure:
// Function Node to check if item needs retry
const item = $input.first();
// If item is scheduled for retry, return true to continue loop
if (item.json.status === 'scheduled\_retry') {
return [
{ json: { continueLoop: true, retryAfter: item.json.retryAfter } }
];
}
// Otherwise exit the loop
return [
{ json: { continueLoop: false, result: item.json } }
];
Connect this to an IF node that checks continueLoop
, and if true, connects to a Wait node that uses the retryAfter
value before returning to your LLM node.
Step 7: Using Webhook Nodes for Asynchronous Processing
For large-scale parallel processing, implement an asynchronous webhook approach:
Webhook workflow setup:
// Webhook Trigger node configuration
{
"path": "process-llm-request",
"responseMode": "lastNode",
"responseData": "allEntries"
}
// Then connect to your LLM node (e.g., OpenAI)
// Finally connect to a Set node that formats the response
Main workflow to call the webhook:
// HTTP Request node configuration
{
"url": "http://your-n8n-instance/webhook/process-llm-request",
"method": "POST",
"sendBody": true,
"bodyParameters": {
"prompt": "={{$json.prompt}}",
"requestId": "={{$json.id}}",
"parameters": "={{$json.parameters}}"
}
}
Step 8: Managing Token Usage with a Function Node
Implement token tracking to avoid exceeding quotas:
// Function node to track token usage
let tokenCount = 0;
const MAX\_TOKENS = 100000; // Your API's token limit
const items = $input.all();
const results = [];
for (const item of items) {
// Estimate tokens (basic approximation)
const promptTokens = Math.ceil(item.json.prompt.length / 4);
const maxResponseTokens = item.json.parameters?.max\_tokens || 1000;
// Check if this would exceed our token budget
if (tokenCount + promptTokens + maxResponseTokens > MAX\_TOKENS) {
// Schedule for later processing
results.push({
json: {
...item.json,
status: 'deferred',
reason: 'token_limit_approaching'
},
pairedItem: item.pairedItem
});
continue;
}
// Update token count and pass through
tokenCount += promptTokens + maxResponseTokens;
results.push(item);
}
// Add token usage info to workflow data
$workflow.setData('tokenUsage', tokenCount);
return results;
Step 9: Implementing a Queue System with n8n Variables
For advanced queue management, implement a queue system using workflow variables:
// Function node to manage queue
// Get current queue from workflow data or initialize
let queue = $workflow.getData('llmRequestQueue') || [];
const MAX\_CONCURRENT = 3; // Maximum concurrent requests
const currentProcessing = $workflow.getData('currentProcessing') || 0;
// Get new items to process
const newItems = $input.all();
// Add new items to queue
queue = [...queue, ...newItems];
// Select items to process now
const availableSlots = MAX\_CONCURRENT - currentProcessing;
const itemsToProcess = queue.splice(0, availableSlots);
// Update workflow data
$workflow.setData('llmRequestQueue', queue);
$workflow.setData('currentProcessing', currentProcessing + itemsToProcess.length);
// Return items to process
return itemsToProcess;
Add another Function node after your LLM processing to update the queue:
// Update processing count after completion
const currentProcessing = $workflow.getData('currentProcessing') || 0;
$workflow.setData('currentProcessing', Math.max(0, currentProcessing - 1));
// Return processed item
return $input.all();
Step 10: Creating a Complete Parallel Processing Workflow
Now let's put everything together in a complete workflow:
Step 11: Optimizing Performance with Execution Mode Settings
Configure execution settings for optimal performance:
These settings ensure your workflow can handle concurrent execution effectively.
Step 12: Monitoring and Debugging Concurrent LLM Calls
Implement monitoring to track your workflow execution:
// Function node for logging and monitoring
const item = $input.first();
// Log important metrics
console.log(`Processing item ID: ${item.json.id}`);
console.log(`Current queue length: ${$workflow.getData('llmRequestQueue')?.length || 0}`);
console.log(`Currently processing: ${$workflow.getData('currentProcessing') || 0}`);
console.log(`Total token usage: ${$workflow.getData('tokenUsage') || 0}`);
// You can also send metrics to external monitoring
if (item.json.status === 'error') {
// Log errors more prominently
console.error(`Error processing item ${item.json.id}: ${item.json.error}`);
}
return item;
Place this node at strategic points in your workflow to monitor execution.
Step 13: Implementing Circuit Breaker Pattern
To prevent cascading failures, implement a circuit breaker pattern:
// Circuit Breaker Function Node
// Place this before your LLM calls
const ERROR\_THRESHOLD = 5;
const RESET\_TIMEOUT = 60000; // 1 minute
// Get circuit state from workflow data
let circuitState = $workflow.getData('circuitState') || {
status: 'closed', // closed, open, half-open
failures: 0,
lastFailure: null,
openedAt: null
};
// Check if circuit should reset from open to half-open
if (circuitState.status === 'open' &&
(Date.now() - circuitState.openedAt) > RESET\_TIMEOUT) {
circuitState.status = 'half-open';
console.log('Circuit switched to half-open state');
}
// Process based on circuit state
if (circuitState.status === 'open') {
// Circuit is open, fast-fail all requests
return $input.all().map(item => ({
json: {
...item.json,
error: 'Circuit breaker open, request rejected',
status: 'rejected'
},
pairedItem: item.pairedItem
}));
}
if (circuitState.status === 'half-open') {
// In half-open state, only let one request through to test
const testItem = $input.first();
$workflow.setData('circuitState', circuitState);
return [testItem];
}
// Circuit is closed, normal operation
$workflow.setData('circuitState', circuitState);
return $input.all();
Add a Function node after your LLM node to update the circuit state:
// Circuit Breaker Status Update
const items = $input.all();
let circuitState = $workflow.getData('circuitState') || {
status: 'closed',
failures: 0,
lastFailure: null,
openedAt: null
};
// Check for errors
const hasErrors = items.some(item =>
item.json.error && item.json.error.includes('rate_limit_exceeded')
);
if (hasErrors) {
// Increment failure counter
circuitState.failures++;
circuitState.lastFailure = Date.now();
// Check if we should open the circuit
if (circuitState.failures >= ERROR\_THRESHOLD) {
circuitState.status = 'open';
circuitState.openedAt = Date.now();
console.log('Circuit breaker opened due to too many failures');
}
} else if (circuitState.status === 'half-open') {
// Success in half-open state, close the circuit
circuitState.status = 'closed';
circuitState.failures = 0;
console.log('Circuit breaker closed after successful test request');
}
$workflow.setData('circuitState', circuitState);
return items;
Step 14: Handling Different LLM Providers with Different Rate Limits
If you're using multiple LLM providers, implement provider-specific handling:
// Function node for multi-provider routing
const items = $input.all();
const results = [];
// Define provider rate limits
const providers = {
'openai': { rateLimit: 3, tokenLimit: 100000 }, // 3 RPM for GPT-4
'anthropic': { rateLimit: 10, tokenLimit: 150000 }, // Example
'cohere': { rateLimit: 20, tokenLimit: 100000 } // Example
};
// Route items based on provider
for (const item of items) {
const provider = item.json.provider || 'openai'; // Default provider
// Add provider-specific metadata
results.push({
json: {
...item.json,
rateLimit: providers[provider].rateLimit,
tokenLimit: providers[provider].tokenLimit,
providerConfig: providers[provider]
},
pairedItem: item.pairedItem
});
}
return results;
Then use a Switch node to route items to different LLM nodes based on the provider.
Step 15: Implementing a Cost-Aware Scheduler
To optimize for both performance and cost, implement a cost-aware scheduler:
// Cost-aware scheduler Function node
const items = $input.all();
const results = [];
// Define cost metrics per model
const modelCosts = {
'gpt-4': { costPer1kTokens: 0.06, priority: 1 },
'gpt-3.5-turbo': { costPer1kTokens: 0.002, priority: 2 },
'claude-2': { costPer1kTokens: 0.03, priority: 1 }
};
// Budget settings
const dailyBudget = 10.00; // $10 per day
let usedBudget = $workflow.getData('dailyBudgetUsed') || 0;
for (const item of items) {
const model = item.json.model || 'gpt-3.5-turbo';
const costMetrics = modelCosts[model] || modelCosts['gpt-3.5-turbo'];
// Estimate cost
const promptTokens = Math.ceil(item.json.prompt.length / 4);
const maxResponseTokens = item.json.parameters?.max\_tokens || 1000;
const totalTokens = promptTokens + maxResponseTokens;
const estimatedCost = (totalTokens / 1000) \* costMetrics.costPer1kTokens;
// Check budget
if (usedBudget + estimatedCost > dailyBudget) {
// Over budget, use cheaper alternative or defer
if (model === 'gpt-4' && modelCosts['gpt-3.5-turbo']) {
// Downgrade to cheaper model
results.push({
json: {
...item.json,
model: 'gpt-3.5-turbo',
originalModel: model,
reason: 'budget\_constraint',
estimatedCost
},
pairedItem: item.pairedItem
});
} else {
// Can't downgrade, defer processing
results.push({
json: {
...item.json,
status: 'deferred',
reason: 'budget\_exceeded',
estimatedCost
},
pairedItem: item.pairedItem
});
}
continue;
}
// Within budget, process normally
usedBudget += estimatedCost;
results.push({
json: {
...item.json,
estimatedCost,
priority: costMetrics.priority
},
pairedItem: item.pairedItem
});
}
// Update budget tracking
$workflow.setData('dailyBudgetUsed', usedBudget);
// Sort by priority (lower number = higher priority)
return results.sort((a, b) => a.json.priority - b.json.priority);
Step 16: Testing and Validating Your Concurrent Processing Solution
Before putting your workflow into production, test it thoroughly:
You can create a test workflow with these components:
// Function Node to generate test data
const testPrompts = [
"Explain quantum computing in simple terms",
"Write a short poem about artificial intelligence",
"What are the top 5 machine learning algorithms?",
// Add more test prompts...
];
const results = [];
for (let i = 0; i < testPrompts.length; i++) {
results.push({
json: {
id: `test-${i}`,
prompt: testPrompts[i],
model: i % 3 === 0 ? 'gpt-4' : 'gpt-3.5-turbo', // Mix of models
parameters: {
max\_tokens: 500,
temperature: 0.7
}
}
});
}
return results;
Then connect this to your parallel processing workflow and monitor the execution.
Step 17: Implementing Graceful Degradation
To handle extreme load situations, implement graceful degradation:
// Graceful Degradation Function Node
const items = $input.all();
const HIGH_LOAD_THRESHOLD = 20; // Number of items in queue
// Get current queue size
const queueSize = $workflow.getData('llmRequestQueue')?.length || 0;
// Check if we're under high load
const highLoad = queueSize > HIGH_LOAD_THRESHOLD;
// Apply degradation strategies under high load
if (highLoad) {
return items.map(item => {
// Apply degradation strategies
const degraded = {
...item.json,
parameters: {
...item.json.parameters,
// Reduce token output to conserve resources
max_tokens: Math.min(item.json.parameters?.max_tokens || 1000, 250),
// Increase temperature slightly for faster responses
temperature: Math.min((item.json.parameters?.temperature || 0.7) + 0.1, 1.0)
}
};
// If using GPT-4, consider downgrading to GPT-3.5 for non-critical requests
if (item.json.model === 'gpt-4' && item.json.priority > 1) {
degraded.model = 'gpt-3.5-turbo';
degraded.originalModel = 'gpt-4';
degraded.degradationApplied = true;
}
return { json: degraded, pairedItem: item.pairedItem };
});
}
// Normal load, no degradation needed
return items;
Step 18: Persisting Queues Across Workflow Executions
To handle long-running processes, implement queue persistence:
// Queue Persistence Function Node
// At the end of workflow execution
// Get current queue
const queue = $workflow.getData('llmRequestQueue') || [];
const currentProcessing = $workflow.getData('currentProcessing') || 0;
// Only save if there are items remaining
if (queue.length > 0 || currentProcessing > 0) {
// Create a persistence entry with timestamp
const persistenceData = {
queue,
currentProcessing,
timestamp: Date.now(),
workflowId: $workflow.id
};
// Save to n8n credentials or external storage
// Example using n8n variables:
$env.setVariable('persistedLlmQueue', JSON.stringify(persistenceData));
console.log(`Persisted queue with ${queue.length} items`);
}
return $input.all();
Add a complementary Function node at the beginning of your workflow to restore the queue:
// Queue Restoration Function Node
// At the start of workflow execution
// Try to get persisted queue
let persistedQueueJson = $env.getVariable('persistedLlmQueue');
if (persistedQueueJson) {
try {
const persistedData = JSON.parse(persistedQueueJson);
// Check if the persisted data is for this workflow and not too old
const MAX\_AGE = 24 _ 60 _ 60 \* 1000; // 24 hours
if (
persistedData.workflowId === $workflow.id &&
(Date.now() - persistedData.timestamp) < MAX\_AGE
) {
// Restore queue
$workflow.setData('llmRequestQueue', persistedData.queue);
$workflow.setData('currentProcessing', persistedData.currentProcessing);
console.log(`Restored queue with ${persistedData.queue.length} items`);
// Clear the persisted data to avoid duplicate processing
$env.setVariable('persistedLlmQueue', '');
}
} catch (error) {
console.error('Error restoring persisted queue:', error.message);
}
}
// Continue with normal workflow
return $input.all();
Step 19: Creating a Dashboard for Monitoring LLM Usage
Implement a monitoring dashboard using n8n:
Sample metrics collection function to add to your LLM workflow:
// Metrics Collection Function Node
// Place after LLM processing
const metrics = {
timestamp: Date.now(),
workflowId: $workflow.id,
model: $input.first().json.model,
promptTokens: $input.first().json.usage?.prompt\_tokens || 0,
completionTokens: $input.first().json.usage?.completion\_tokens || 0,
totalTokens: $input.first().json.usage?.total\_tokens || 0,
executionTime: Date.now() - $execution.startedAt,
success: !$input.first().json.error,
error: $input.first().json.error || null
};
// Send metrics to dashboard webhook
const webhookUrl = 'http://your-n8n-instance/webhook/llm-metrics';
$http.post(webhookUrl, metrics);
// Continue workflow
return $input.all();
Step 20: Scaling to Enterprise-Level Workloads
For enterprise-scale LLM processing, implement these advanced techniques:
Example of a workflow using external Redis queue for distributed processing:
// Redis Queue Producer Function Node
// Requires Redis credentials configured in n8n
const items = $input.all();
const REDIS_QUEUE_KEY = 'llm_processing_queue';
// Connect to Redis
const redis = new $nodejs.redis.createClient({
url: 'redis://your-redis-host:6379'
});
await redis.connect();
// Add items to queue
for (const item of items) {
await redis.rPush(REDIS_QUEUE_KEY, JSON.stringify(item.json));
}
console.log(`Added ${items.length} items to Redis queue`);
await redis.quit();
return [{ json: { status: 'queued', count: items.length } }];
Create a separate consumer workflow with a Cron node that runs frequently to process the queue:
// Redis Queue Consumer Function Node
const REDIS_QUEUE_KEY = 'llm_processing_queue';
const BATCH\_SIZE = 5;
// Connect to Redis
const redis = new $nodejs.redis.createClient({
url: 'redis://your-redis-host:6379'
});
await redis.connect();
// Get items from queue (FIFO)
const items = [];
for (let i = 0; i < BATCH\_SIZE; i++) {
const item = await redis.lPop(REDIS_QUEUE_KEY);
if (!item) break;
items.push({ json: JSON.parse(item) });
}
await redis.quit();
if (items.length === 0) {
console.log('No items in queue to process');
return [];
}
console.log(`Processing ${items.length} items from Redis queue`);
return items;
Conclusion
Managing concurrency issues with parallel LLM calls in n8n requires a multi-faceted approach combining rate limiting, error handling, queuing, and cost management. By implementing the techniques in this guide, you can build robust workflows that efficiently process multiple LLM requests in parallel while respecting API limitations and optimizing for both performance and cost.
Remember to adapt these solutions to your specific needs, considering your particular LLM provider's limitations and your expected workload. Start with the core components of batching and error handling, then progressively add more sophisticated features like circuit breakers, cost management, and distributed processing as your requirements grow.
Regular monitoring and testing are crucial to ensuring your concurrent processing solution remains effective as your usage patterns and LLM provider requirements evolve.
When it comes to serving you, we sweat the little things. That’s why our work makes a big impact.