yload requires careful memory management. We implement a strict size limit to prevent unbounded buffering, and we run CPU-intensive NLP analysis in a thread pool to avoid blocking the asyncio event loop.
Step 4: Wire Observability & Cost Tracking
Every intercepted request emits structured metrics. Prometheus histograms track latency distribution, while counters track policy violations and redaction events. OpenTelemetry spans attach to the middleware lifecycle, enabling distributed tracing across agent tool calls and downstream dependencies.
Step 5: Enforce Dependency Injection for Sandboxing
Agent tool access is restricted through scoped dependency injection. The middleware validates that the requesting context holds appropriate permissions before allowing tool execution. This limits the blast radius if an agent enters an unintended execution path.
Implementation
# control_plane/safety_interceptor.py
import time
import asyncio
from typing import Optional
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response, JSONResponse
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from prometheus_client import Histogram, Counter
import logging
logger = logging.getLogger("control_plane.safety")
# ββ Telemetry Definitions βββββββββββββββββββββββββββββββββββββββ
LATENCY_HISTOGRAM = Histogram(
"agent_response_latency_seconds",
"Time taken to process and validate agent output",
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
POLICY_BLOCKED = Counter(
"agent_policy_blocks_total",
"Count of responses blocked by compliance rules",
labelnames=["rule_type"]
)
PII_REDACTED = Counter(
"agent_pii_redactions_total",
"Count of PII entities detected and masked",
labelnames=["entity_category"]
)
# ββ Configuration Constants βββββββββββββββββββββββββββββββββββββ
MAX_RESPONSE_BYTES = 2 * 1024 * 1024 # 2MB hard limit
FORBIDDEN_PATTERNS = [
"ignore previous instructions",
"override system prompt",
"disregard safety guidelines",
"you are now acting as",
]
class ComplianceEngine:
"""Evaluates output against deterministic policy rules."""
def __init__(self, patterns: list[str]) -> None:
self._patterns = [p.lower() for p in patterns]
def evaluate(self, payload: str) -> Optional[str]:
lower_payload = payload.lower()
for pattern in self._patterns:
if pattern in lower_payload:
return f"pattern_match:{pattern}"
return None
class DataRedactor:
"""Context-aware PII detection and masking using Presidio."""
def __init__(self, languages: list[str] = None) -> None:
self._analyzer = AnalyzerEngine()
self._anonymizer = AnonymizerEngine()
self._languages = languages or ["en"]
self._target_entities = [
"PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER",
"CREDIT_CARD", "IBAN_CODE", "IP_ADDRESS"
]
self._replacement_map = {
"PERSON": "[REDACTED_USER]",
"EMAIL_ADDRESS": "[REDACTED_EMAIL]",
"PHONE_NUMBER": "[REDACTED_PHONE]",
"CREDIT_CARD": "[REDACTED_CARD]",
"IBAN_CODE": "[REDACTED_IBAN]",
"IP_ADDRESS": "[REDACTED_IP]",
}
def sanitize(self, text: str) -> str:
all_findings = []
for lang in self._languages:
findings = self._analyzer.analyze(
text=text,
language=lang,
entities=self._target_entities,
score_threshold=0.65,
)
all_findings.extend(findings)
if not all_findings:
return text
for finding in all_findings:
PII_REDACTED.labels(entity_category=finding.entity_type).inc()
logger.debug("PII detected: type=%s confidence=%.2f", finding.entity_type, finding.score)
sanitized = self._anonymizer.anonymize(
text=text,
analyzer_results=all_findings,
operators={
entity: OperatorConfig("replace", {"new_value": replacement})
for entity, replacement in self._replacement_map.items()
},
)
return sanitized.text
class SafetyInterceptor(BaseHTTPMiddleware):
"""External control plane middleware for LLM agent responses."""
def __init__(self, app, languages: list[str] = None) -> None:
super().__init__(app)
self._compliance = ComplianceEngine(FORBIDDEN_PATTERNS)
self._redactor = DataRedactor(languages)
async def dispatch(self, request: Request, call_next) -> Response:
start = time.perf_counter()
response = await call_next(request)
# Reassemble streamed body with memory guard
body_chunks = []
total_size = 0
async for chunk in response.body_iterator:
total_size += len(chunk)
if total_size > MAX_RESPONSE_BYTES:
logger.warning("Response exceeds size limit, terminating buffer")
return JSONResponse(
status_code=413,
content={"error": "payload_too_large", "limit_bytes": MAX_RESPONSE_BYTES}
)
body_chunks.append(chunk)
raw_payload = b"".join(body_chunks).decode("utf-8", errors="replace")
# Run CPU-bound NLP analysis off the event loop
loop = asyncio.get_event_loop()
violation = await loop.run_in_executor(None, self._compliance.evaluate, raw_payload)
if violation:
POLICY_BLOCKED.labels(rule_type=violation).inc()
logger.info("Policy violation intercepted: %s", violation)
return JSONResponse(
status_code=403,
content={"error": "compliance_block", "reason": violation}
)
sanitized_payload = await loop.run_in_executor(None, self._redactor.sanitize, raw_payload)
latency = time.perf_counter() - start
LATENCY_HISTOGRAM.observe(latency)
logger.info("Agent response validated: %.3fs | %d chars", latency, len(sanitized_payload))
return Response(
content=sanitized_payload.encode("utf-8"),
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type,
)
Architecture Decisions & Rationale
- External Middleware over Agent Logic: Embedding safety checks inside agent loops creates coupling. If you swap models or refactor orchestration, guardrails break. Middleware enforces policy at the transport boundary, guaranteeing execution regardless of internal routing.
- Thread Pool for NLP Analysis: Presidio's spaCy models perform CPU-intensive tokenization and entity recognition. Running them directly in an async handler blocks the event loop, degrading throughput for all concurrent requests.
run_in_executor isolates CPU work while preserving async I/O performance.
- Hard Payload Limit: Streaming responses can theoretically grow indefinitely. A 2MB buffer limit prevents memory exhaustion attacks and forces agents to paginate or truncate outputs, which aligns with production API design standards.
- Metric-Driven Observability: Histograms capture latency distribution, enabling p95/p99 alerting. Counters track violation trends. Both integrate natively with Prometheus/Grafana stacks, providing audit-ready telemetry without custom logging pipelines.
- Header & Status Preservation: Reconstructing the response without copying metadata strips
Content-Type, Cache-Control, and error codes. Explicitly forwarding these fields maintains HTTP contract integrity for downstream clients.
Pitfall Guide
1. Unbounded Response Buffering
Explanation: Collecting streamed chunks without a size limit allows malicious or malfunctioning agents to allocate gigabytes of RAM, triggering OOM kills.
Fix: Implement a strict byte counter during chunk iteration. Terminate and return 413 Payload Too Large when the threshold is crossed.
2. Blocking the Async Event Loop
Explanation: Running NLP models or heavy regex operations directly in dispatch() stalls the asyncio loop, causing cascading timeouts across all endpoints.
Fix: Offload CPU-bound analysis to asyncio.to_thread() or loop.run_in_executor(). Keep I/O operations strictly async.
3. Middleware Registration Order Blindness
Explanation: FastAPI executes middleware in reverse registration order (LIFO). Placing safety checks before authentication wastes compute on unauthorized requests.
Fix: Register authentication/authorization middleware last so it executes first. Follow with safety interceptors, then logging/telemetry.
4. Logging Raw Sensitive Values
Explanation: Accidentally logging PII during debugging or error handling violates GDPR/CCPA and creates audit liabilities.
Fix: Log only entity types, confidence scores, and redaction counts. Never log the original payload or matched substrings. Use structured logging with explicit PII filters.
5. Regex-Only PII Detection
Explanation: Regular expressions cannot distinguish between a credit card number and a product serial number, generating high false-positive rates.
Fix: Use context-aware NLP engines like Presidio that leverage linguistic patterns, surrounding tokens, and validation checksums (Luhn, IBAN) to reduce false positives.
Explanation: Returning a new Response object without copying headers and status codes strips Content-Type, ETag, and error states, breaking client expectations.
Fix: Explicitly pass status_code=response.status_code, headers=dict(response.headers), and media_type=response.media_type when reconstructing the response.
7. Ignoring Streaming/SSE Responses
Explanation: Server-Sent Events and WebSocket upgrades cannot be buffered synchronously. Applying byte-level reconstruction breaks real-time feeds.
Fix: Detect media_type or upgrade headers early. Bypass buffering for streaming endpoints and apply chunk-level validation or skip sanitization for real-time UI updates.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Single agent, low compliance requirements | Prompt-embedded safety + lightweight regex | Simpler stack, lower latency overhead | Minimal infrastructure cost |
| Multi-agent platform, strict regulatory compliance | External middleware control plane + Presidio | Centralized policy, audit-ready telemetry, consistent enforcement | Moderate compute overhead (~15β30ms/request) |
| High-throughput public API | Async interceptor with streaming bypass + size limits | Prevents OOM, maintains real-time UX, scales horizontally | Higher memory allocation for buffer management |
| Internal enterprise tooling | Full sanitization + dependency-injected sandboxing | Limits blast radius, enforces least-privilege tool access | Increased DevOps complexity for DI container management |
Configuration Template
# app/main.py
from fastapi import FastAPI
from control_plane.safety_interceptor import SafetyInterceptor
from control_plane.telemetry import setup_opentelemetry, setup_prometheus
from control_plane.sandbox import ToolAccessDependency
app = FastAPI(title="Agent Control Plane", version="1.0.0")
# Telemetry initialization
setup_prometheus()
setup_opentelemetry(app)
# Dependency injection for sandboxed tool access
app.dependency_overrides[ToolAccessDependency] = ToolAccessDependency
# Middleware stack (LIFO execution)
app.add_middleware(SafetyInterceptor, languages=["en", "es"])
app.add_middleware(AuthenticationGuard) # Registered last β executes first
@app.get("/agent/query")
async def query_agent(request: dict):
# Agent logic executes here
# Middleware intercepts response automatically
return {"status": "processed"}
Quick Start Guide
- Install dependencies:
pip install fastapi presidio-analyzer presidio-anonymizer prometheus-client opentelemetry-api opentelemetry-sdk
- Create the interceptor module: Copy the
SafetyInterceptor, ComplianceEngine, and DataRedactor classes into control_plane/safety_interceptor.py
- Register middleware: Add
app.add_middleware(SafetyInterceptor) to your FastAPI application after route definitions
- Configure telemetry: Initialize Prometheus metrics and OpenTelemetry exporters before starting the server
- Deploy and validate: Send a test request containing a known PII pattern and a policy trigger. Verify that the response is redacted, the violation is logged, and Prometheus metrics increment correctly.