isecond inference overhead.
Step 1: Data Ingestion & Window Aggregation
Metrics and logs arrive via Kafka or Redis Streams. Raw points are insufficient for anomaly scoring; they require temporal context. Implement a tumbling window that computes rolling statistics before inference.
import { Readable } from 'stream';
import { MetricsPoint, WindowedMetrics } from './types';
export class MetricWindowAggregator {
private buffer: MetricsPoint[] = [];
private windowSizeMs: number;
private stepMs: number;
constructor(windowSizeMs = 60000, stepMs = 10000) {
this.windowSizeMs = windowSizeMs;
this.stepMs = stepMs;
}
async *process(stream: Readable): AsyncGenerator<WindowedMetrics> {
for await (const point of stream) {
this.buffer.push(point);
const now = Date.now();
if (now - this.buffer[0].timestamp >= this.windowSizeMs) {
yield this.computeWindow();
this.buffer = this.buffer.filter(p => now - p.timestamp < this.windowSizeMs);
}
}
}
private computeWindow(): WindowedMetrics {
const values = this.buffer.map(p => p.value);
const mean = values.reduce((a, b) => a + b, 0) / values.length;
const variance = values.reduce((acc, v) => acc + Math.pow(v - mean, 2), 0) / values.length;
return {
timestamp: Date.now(),
mean,
stdDev: Math.sqrt(variance),
min: Math.min(...values),
max: Math.max(...values),
count: values.length,
rawSequence: values.slice(-10) // fixed-length sequence for model input
};
}
}
Step 2: Feature Normalization & Tensor Construction
Models require consistent scaling. Apply min-max or z-score normalization per metric family, not globally. Construct fixed-size tensors for ONNX inference.
import * as ort from 'onnxruntime-node';
export class FeatureNormalizer {
private minMap = new Map<string, number>();
private maxMap = new Map<string, number>();
normalize(metricId: string, value: number): number {
const min = this.minMap.get(metricId) ?? value;
const max = this.maxMap.get(metricId) ?? value;
const range = max - min || 1;
return (value - min) / range;
}
updateBounds(metricId: string, value: number) {
this.minMap.set(metricId, Math.min(this.minMap.get(metricId) ?? value, value));
this.maxMap.set(metricId, Math.max(this.maxMap.get(metricId) ?? value, value));
}
}
export async function buildTensor(normalizedSequence: number[]): Promise<ort.Tensor> {
// Shape: [batch=1, sequence_length, features=1]
return new ort.Tensor('float32', new Float32Array(normalizedSequence), [1, normalizedSequence.length, 1]);
}
Step 3: Model Inference & Score Calibration
Load a pre-trained temporal autoencoder or isolation forest exported to ONNX. Run inference asynchronously. Raw reconstruction error or anomaly score must be calibrated against a rolling baseline to produce actionable thresholds.
export class AnomalyDetector {
private session: ort.InferenceSession | null = null;
private scoreHistory: number[] = [];
private calibrationWindow = 500;
async loadModel(modelPath: string) {
this.session = await ort.InferenceSession.create(modelPath);
}
async detect(tensor: ort.Tensor): Promise<{ score: number; isAnomaly: boolean; threshold: number }> {
if (!this.session) throw new Error('Model not loaded');
const feeds = { input: tensor };
const output = await this.session.run(feeds);
const rawScore = output.reconstruction_error.data[0] as number;
this.scoreHistory.push(rawScore);
if (this.scoreHistory.length > this.calibrationWindow) {
this.scoreHistory.shift();
}
// Adaptive threshold: mean + 3*std of recent scores
const mean = this.scoreHistory.reduce((a, b) => a + b, 0) / this.scoreHistory.length;
const std = Math.sqrt(this.scoreHistory.reduce((acc, v) => acc + Math.pow(v - mean, 2), 0) / this.scoreHistory.length);
const threshold = mean + 3 * std;
return {
score: rawScore,
isAnomaly: rawScore > threshold,
threshold
};
}
}
Step 4: Alert Routing & Feedback Loop
Do not alert on every detection. Implement hysteresis and deduplication. Route to incident management systems with context. Capture engineer acknowledgments to retrain or recalibrate.
export class AlertRouter {
private activeAlerts = new Map<string, number>();
private cooldownMs = 300000; // 5 minutes
async route(metricId: string, result: { score: number; isAnomaly: boolean; threshold: number }) {
if (!result.isAnomaly) return;
const lastAlert = this.activeAlerts.get(metricId) ?? 0;
if (Date.now() - lastAlert < this.cooldownMs) return;
this.activeAlerts.set(metricId, Date.now());
await fetch('https://hooks.slack.com/services/...', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
metric: metricId,
score: result.score.toFixed(4),
threshold: result.threshold.toFixed(4),
severity: result.score > result.threshold * 2 ? 'critical' : 'warning',
timestamp: new Date().toISOString()
})
});
}
}
Architecture Decisions & Rationale
- TypeScript orchestration: Native async streams, strong typing for telemetry payloads, and seamless integration with cloud SDKs reduce runtime errors in production pipelines.
- ONNX Runtime: Decouples model training (Python/PyTorch) from inference (Node.js). Enables sub-10ms inference on standard containers without GPU dependency.
- Online calibration: Static thresholds fail under drift. Computing mean/std over a sliding window of scores adapts to gradual baseline shifts without retraining.
- Decoupled alerting: Inference remains stateless. Alert routing handles deduplication, cooldowns, and external integrations. This prevents cascade failures when detection spikes.
Pitfall Guide
-
Training on non-representative baselines
Models trained during low-traffic periods learn narrow distributions. Production traffic introduces seasonality, batch jobs, and deployment spikes. Always train on multi-week windows covering peak, trough, and deployment cycles.
-
Treating reconstruction error as a probability
Autoencoders output distance metrics. Mapping score > 0.85 to 85% confidence is mathematically invalid. Use empirical calibration: collect scores over 7 days, fit a distribution, and set thresholds at desired false positive rates.
-
Ignoring feature scaling per metric family
Global normalization collapses variance across unrelated metrics. CPU utilization and request latency operate on different scales. Normalize within metric namespaces to preserve relative anomaly signals.
-
Deploying without shadow mode
Production inference must run parallel to existing monitoring for 14–30 days. Compare AI alerts against historical incidents. Measure precision, recall, and alert overlap before enabling active routing.
-
Missing drift detection in the pipeline
Concept drift degrades model accuracy silently. Implement statistical tests (KS-test, PSI) on input feature distributions. Trigger retraining or fallback to statistical baselines when drift exceeds thresholds.
-
Over-engineering inference latency
Complex transformers or large ensembles add 200–500ms per inference. For real-time metrics, prefer lightweight autoencoders or isolation forests. Reserve heavy models for batch log analysis or post-incident enrichment.
-
No feedback loop for threshold tuning
Engineers dismiss alerts that lack context. Implement acknowledgment tracking. Use positive/negative feedback to adjust calibration windows, update thresholds, or flag models for retraining.
Production best practices: Run detection pipelines in isolated containers with resource limits. Version models alongside pipeline code. Expose inference metrics (latency, score distribution, calibration drift) to observability platforms. Maintain fallback rule-based alerts during model deployment windows.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-frequency infrastructure metrics (CPU, memory, latency) | Temporal Autoencoder + Online Calibration | Captures sequential dependencies; sub-second latency; low compute overhead | Low (CPU-bound inference) |
| Sparse, irregular business metrics (conversion rate, checkout failures) | Statistical ML (Isolation Forest / LOF) | Robust to missing data; requires fewer sequential samples; easier to explain | Low-Medium (periodic retraining) |
| Unstructured application logs with free-text errors | LLM-Assisted Classification + Embedding | Parses semantic anomalies; generates root-cause context; handles novel error patterns | High (token costs, GPU/LLM API) |
| Multi-dimensional service mesh traces | Graph-based Anomaly Detection (GNN) | Models dependency relationships; detects cascading failures across services | High (GPU required, complex training) |
| Legacy monolith with stable baselines | Static Thresholds + Seasonal Adjustments | Simpler to maintain; lower operational overhead; sufficient for non-dynamic workloads | Low (minimal compute) |
Configuration Template
pipeline:
ingestion:
source: kafka
topic: telemetry.metrics
group_id: anomaly-detector-v1
concurrency: 4
windowing:
size_ms: 60000
step_ms: 10000
min_samples: 15
normalization:
strategy: per_metric_family
update_frequency: 1000_points
model:
path: /models/temporal_autoencoder.onnx
batch_size: 1
calibration_window: 500
std_multiplier: 3.0
fallback_strategy: statistical_baseline
alerting:
cooldown_ms: 300000
severity_thresholds:
warning: 1.5
critical: 2.5
webhook: https://hooks.slack.com/services/xxx
deduplication: metric_id + severity
monitoring:
drift_test: psi
drift_threshold: 0.2
metrics_export: prometheus
health_check_interval: 10s
Quick Start Guide
-
Initialize project and dependencies
mkdir anomaly-pipeline && cd anomaly-pipeline
npm init -y
npm install onnxruntime-node typescript ts-node @types/node
npx tsc --init --target ES2020 --module commonjs --outDir dist
-
Export a pre-trained model to ONNX
Train a temporal autoencoder or isolation forest in Python. Export using onnx library. Place the .onnx file in /models/ directory.
-
Run the streaming processor
ts-node src/main.ts --config pipeline.yaml
The service connects to the configured stream, aggregates windows, runs inference, calibrates scores, and routes alerts.
-
Validate with synthetic telemetry
Inject normal traffic for 10 minutes, then spike a metric by 300%. Verify alert triggers after calibration window completes. Check Prometheus metrics for inference latency and score distribution.
-
Enable shadow mode before production routing
Set alerting.dry_run: true in configuration. Run for 14 days. Compare generated alerts against incident history. Adjust std_multiplier and calibration_window until false positive rate aligns with operational tolerance. Switch dry_run: false when validated.