ementation
import { fetch, Agent, setGlobalDispatcher } from 'undici';
import { randomUUID } from 'node:crypto';
import { performance } from 'node:perf_hooks';
// Types
interface BatchOperation<T> {
id: string;
payload: T;
}
interface BatchResult<T> {
id: string;
status: 'success' | 'failed';
data?: T;
error?: string;
}
interface BatchConfig {
url: string;
maxChunkSize: number;
minChunkSize: number;
maxConcurrency: number;
timeoutMs: number;
retryAttempts: number;
}
export class BatchProcessor<T> {
private config: Required<BatchConfig>;
private agent: Agent;
private queue: Array<{ id: string; resolve: (res: BatchResult<T>) => void; reject: (err: Error) => void; payload: T }> = [];
private activeChunks = 0;
private currentChunkSize: number;
private latencyHistory: number[] = [];
constructor(config: BatchConfig) {
this.config = {
maxChunkSize: config.maxChunkSize || 50,
minChunkSize: config.minChunkSize || 10,
maxConcurrency: config.maxConcurrency || 4,
timeoutMs: config.timeoutMs || 5000,
retryAttempts: config.retryAttempts || 2,
...config
};
// Explicit connection pooling for HTTP/1.1 & HTTP/2
this.agent = new Agent({
keepAliveTimeout: 30_000,
keepAliveMaxTimeout: 60_000,
pipelining: 1, // Disable pipelining for safety with batch endpoints
connections: this.config.maxConcurrency * this.config.maxChunkSize
});
setGlobalDispatcher(this.agent);
this.currentChunkSize = this.config.maxChunkSize;
}
async add(payload: T): Promise<BatchResult<T>> {
return new Promise((resolve, reject) => {
this.queue.push({ id: randomUUID(), resolve, reject, payload });
this.processQueue();
});
}
private async processQueue(): Promise<void> {
if (this.queue.length === 0 || this.activeChunks >= this.config.maxConcurrency) return;
const chunk = this.queue.splice(0, this.currentChunkSize);
this.activeChunks++;
try {
await this.executeChunk(chunk);
} catch (err) {
chunk.forEach(op => op.reject(err as Error));
} finally {
this.activeChunks--;
this.processQueue(); // Drain remaining queue
}
}
private async executeChunk(operations: typeof this.queue): Promise<void> {
const idempotencyKey = randomUUID();
const startTime = performance.now();
for (let attempt = 0; attempt <= this.config.retryAttempts; attempt++) {
try {
const response = await fetch(this.config.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Idempotency-Key': idempotencyKey,
'X-Batch-Size': operations.length.toString()
},
body: JSON.stringify({
operations: operations.map(op => ({ id: op.id, payload: op.payload }))
}),
dispatcher: this.agent,
signal: AbortSignal.timeout(this.config.timeoutMs)
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const results: BatchResult<T>[] = await response.json();
const latency = performance.now() - startTime;
this.adaptChunkSize(latency, results);
results.forEach(res => {
const op = operations.find(o => o.id === res.id);
if (op) {
if (res.status === 'success') op.resolve(res);
else op.reject(new Error(res.error || 'Partial failure'));
}
});
return;
} catch (err) {
if (attempt === this.config.retryAttempts) throw err;
await new Promise(r => setTimeout(r, 100 * (attempt + 1)));
}
}
}
private adaptChunkSize(latency: number, results: BatchResult<T>[]): void {
this.latencyHistory.push(latency);
if (this.latencyHistory.length > 20) this.latencyHistory.shift();
const avgLatency = this.latencyHistory.reduce((a, b) => a + b, 0) / this.latencyHistory.length;
const errorRate = results.filter(r => r.status === 'failed').length / results.length;
// Adaptive logic: shrink if latency spikes or errors increase, grow if stable
if (avgLatency > this.config.timeoutMs * 0.7 || errorRate > 0.1) {
this.currentChunkSize = Math.max(this.config.minChunkSize, this.currentChunkSize - 5);
} else if (avgLatency < this.config.timeoutMs * 0.3 && errorRate < 0.05) {
this.currentChunkSize = Math.min(this.config.maxChunkSize, this.currentChunkSize + 2);
}
}
async drain(): Promise<void> {
while (this.queue.length > 0 || this.activeChunks > 0) {
await new Promise(r => setTimeout(r, 100));
}
this.agent.close();
}
}
2. Usage Example
// Mock server endpoint expectation:
// POST /batch
// Body: { operations: [{ id: "uuid", payload: { ... } }] }
// Response: [{ id: "uuid", status: "success" | "failed", data?: ..., error?: ... }]
async function main() {
const processor = new BatchProcessor<{ userId: string; action: string }>({
url: 'http://localhost:3000/batch',
maxChunkSize: 50,
minChunkSize: 10,
maxConcurrency: 4,
timeoutMs: 3000,
retryAttempts: 2
});
const promises = Array.from({ length: 1000 }, (_, i) =>
processor.add({ userId: `user_${i}`, action: 'update_profile' })
);
const results = await Promise.allSettled(promises);
const successes = results.filter(r => r.status === 'fulfilled').length;
const failures = results.filter(r => r.status === 'rejected').length;
console.log(`Completed: ${successes} success, ${failures} failed`);
await processor.drain();
}
main().catch(console.error);
Pitfall Guide
| Symptom | Root Cause | Troubleshooting & Fix |
|---|
| Memory OOM under load | Unbounded queue growth when consumer is slower than producer | Implement backpressure: drop oldest requests, reject new ones, or use a bounded AsyncQueue with await queue.push() |
| Silent partial failures | Batch endpoint returns 200 OK but marks individual items as failed; client treats entire batch as success | Always parse the response array. Map each id to its status. Never assume batch success without iterating results. |
| Rate limit spikes (429s) | Fixed chunk size sends bursts that exceed token bucket limits | Enable adaptive chunking (shown in code). Add exponential backoff with jitter: delay = Math.min(base * 2^attempt, max) * (0.5 + Math.random()) |
| TLS handshake storms | Connection pool exhausted or keepAlive disabled, forcing repeated handshakes | Configure Agent with keepAliveTimeout: 30000, connections: maxConcurrency * maxChunkSize. Verify Connection: keep-alive headers. |
| Idempotency key collisions | Reusing keys across different payloads or batches causes stale responses | Generate keys per batch execution, not per operation. Use randomUUID() or deterministic hash of payload + sequence counter. |
| Timeout cascades | Chunk size too large for payload serialization/deserialization time | Monitor p95 latency vs timeoutMs. If serialization dominates, reduce maxChunkSize or switch to streaming/chunked transfer encoding. |
Debugging Checklist:
- Enable
undici debug logging: DEBUG=undici* node app.js
- Verify connection pool utilization:
agent.stats.connected vs agent.stats.pending
- Trace batch correlation IDs through server logs to confirm partial failure mapping
- Use
clinic.js or 0x to profile memory leaks in long-running batch processors
Production Bundle
Configuration Matrix
| Parameter | Recommended Range | Tuning Guidance |
|---|
maxChunkSize | 20–100 | Start at 50. Reduce if payload > 1MB or server deserialization is CPU-bound. |
maxConcurrency | 2–8 | Match to server-side thread pool or event loop capacity. Avoid exceeding connections limit. |
timeoutMs | 2000–5000 | Set to 1.5x expected p95 batch latency. Use per-request AbortSignal.timeout() |
retryAttempts | 1–3 | Only retry on transient errors (5xx, network reset). Never retry on 4xx or idempotency violations. |
Observability Setup
Integrate OpenTelemetry metrics to track batch health:
import { metrics } from '@opentelemetry/api';
const meter = metrics.getMeter('batch-processor');
const batchLatency = meter.createHistogram('batch.latency', { unit: 'ms' });
const batchSuccess = meter.createCounter('batch.success');
const batchFailure = meter.createCounter('batch.failure');
// Inside executeChunk():
batchLatency.record(latency, { chunk_size: operations.length });
results.forEach(r => r.status === 'success' ? batchSuccess.add(1) : batchFailure.add(1));
Dashboard Queries:
rate(batch_failure_total[5m]) / rate(batch_success_total[5m]) → Alert if > 5%
histogram_quantile(0.95, batch_latency_bucket) → Alert if > 80% of timeout
agent_connections_total vs agent_connections_active → Detect pool starvation
Runbook & Deployment Checklist
Batching is not a configuration toggle; it is an architectural contract. When chunking, idempotency, and observability are treated as first-class concerns, HTTP clients transition from latency liabilities to predictable, cost-efficient data pipelines.