2: Native Audit Extension Deployment
Enable database-native auditing rather than relying on third-party agents that proxy traffic. For PostgreSQL, deploy pgAudit. For MySQL, use audit_log plugin. For SQL Server, configure SQL Server Audit. Native extensions write directly to the server's log subsystem, bypassing application-layer overhead.
Step 3: Log Shipping & Normalization
Ship audit logs to a centralized, immutable store. Use filebeat or fluentd to tail audit logs, parse JSON/CSV output, and push to a message bus (Kafka, AWS Kinesis, or Google Pub/Sub). Normalize schema differences into a unified event structure:
interface AuditEvent {
timestamp: string;
db_instance_id: string;
classification: 'public' | 'internal' | 'restricted' | 'regulated';
event_type: 'login' | 'query' | 'ddl' | 'privilege_change' | 'config_change';
user: string;
source_ip: string;
database: string;
schema: string;
object: string;
statement: string;
session_id: string;
raw_log: string;
}
Step 4: Policy Engine Implementation
Build a TypeScript policy engine that consumes normalized events, evaluates rules, and emits alerts or automated actions. The engine must support rule versioning, hot-reloading, and idempotent execution.
import { Kafka, logLevel } from 'kafkajs';
import { z } from 'zod';
const kafka = new Kafka({
clientId: 'db-audit-engine',
brokers: [process.env.KAFKA_BROKERS || 'localhost:9092'],
logLevel: logLevel.WARN,
});
const consumer = kafka.consumer({ groupId: 'audit-policy-evaluator' });
const admin = kafka.admin();
const AuditEventSchema = z.object({
timestamp: z.string(),
db_instance_id: z.string(),
classification: z.enum(['public', 'internal', 'restricted', 'regulated']),
event_type: z.enum(['login', 'query', 'ddl', 'privilege_change', 'config_change']),
user: z.string(),
source_ip: z.string(),
database: z.string(),
schema: z.string(),
object: z.string(),
statement: z.string(),
session_id: z.string(),
});
type AuditEvent = z.infer<typeof AuditEventSchema>;
const POLICIES = [
{
id: 'POL-001',
name: 'Restricted DB Public Access',
description: 'Alert on any login to restricted databases from non-VPC IPs',
evaluate: (event: AuditEvent) =>
event.classification === 'restricted' &&
event.event_type === 'login' &&
!event.source_ip.startsWith('10.') &&
!event.source_ip.startsWith('172.16.') &&
!event.source_ip.startsWith('192.168.'),
severity: 'critical',
action: 'revoke_session' as const,
},
{
id: 'POL-002',
name: 'Schema Modification on Production',
description: 'Block DDL on production databases outside maintenance windows',
evaluate: (event: AuditEvent) =>
event.event_type === 'ddl' &&
event.database.includes('prod'),
severity: 'high',
action: 'quarantine_user' as const,
},
{
id: 'POL-003',
name: 'Excessive Privilege Grant',
description: 'Detect GRANT ALL or role escalation on regulated data',
evaluate: (event: AuditEvent) =>
event.event_type === 'privilege_change' &&
event.statement.toUpperCase().includes('GRANT ALL') &&
event.classification === 'regulated',
severity: 'critical',
action: 'rollback_and_alert' as const,
},
];
async function evaluatePolicy(event: AuditEvent) {
for (const policy of POLICIES) {
if (policy.evaluate(event)) {
console.warn(`[ALERT] ${policy.id} triggered on ${event.db_instance_id}`, {
severity: policy.severity,
action: policy.action,
event,
});
await dispatchRemediation(policy, event);
}
}
}
async function dispatchRemediation(policy: any, event: AuditEvent) {
switch (policy.action) {
case 'revoke_session':
// Execute pg_cancel_backend or terminate connection via secure admin API
break;
case 'quarantine_user':
// Revoke login privileges temporarily, notify security team
break;
case 'rollback_and_alert':
// Execute inverse DDL if safe, escalate to human review
break;
}
}
async function startAuditPipeline() {
await admin.connect();
await consumer.connect();
await consumer.subscribe({ topic: 'db-audit-events', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const raw = message.value?.toString();
if (!raw) return;
try {
const event = AuditEventSchema.parse(JSON.parse(raw));
await evaluatePolicy(event);
} catch (err) {
console.error('[AUDIT-ENGINE] Parse/Policy error:', err);
}
},
});
}
startAuditPipeline().catch(console.error);
Step 5: Encryption & Secret Verification
Audit systems must verify that data-at-rest encryption is enabled, TLS is enforced for connections, and credentials are rotated. Integrate with your secret manager (Vault, AWS Secrets Manager, GCP Secret Manager) to validate rotation timestamps and detect static credentials in connection strings or environment variables.
Step 6: Alerting & Immutable Storage
Route validated alerts to your SIEM, PagerDuty, or Slack channels. Store raw audit logs in immutable object storage (S3 with Object Lock, GCS WORM, Azure Immutable Blob) with retention policies aligned to compliance requirements. Never store audit logs on the same disk or instance as the target database.
Architecture Rationale
- Native extensions over agents: Eliminates network proxy overhead, captures DDL/privilege changes that agents miss, and reduces attack surface.
- Event-driven pipeline: Decouples collection from evaluation. Scales horizontally, survives broker outages, and supports replay for forensics.
- TypeScript policy engine: Provides type safety, hot-reload capabilities, and seamless integration with modern observability stacks. Rule evaluation runs in-memory, avoiding database round-trips.
- Immutable storage + cryptographic hashing: Prevents log tampering, satisfies compliance chain-of-custody requirements, and enables cryptographic verification during audits.
Pitfall Guide
- Auditing without a baseline: Enabling verbose logging on an unclassified environment generates noise that drowns signal. Always inventory assets, classify data, and map baseline privilege matrices before activating policies.
- Over-auditing everything: Logging every
SELECT on a high-throughput OLTP system degrades performance and fills storage. Scope audits by sensitivity classification, user role, and event type. Use sampling only when mathematically validated for compliance.
- Ignoring application-layer auth bypass: Database audits capture SQL execution, not application logic. If your app uses connection pooling with a single service account, you lose user-level attribution. Implement row-level security, application-level query tagging, or proxy authentication to preserve identity context.
- Storing audit logs on the target instance: Co-locating logs with data creates a single point of failure and violates tamper-resistance requirements. Ship logs to external, immutable storage immediately. Use write-ahead log (WAL) shipping or native log rotation to external endpoints.
- Treating compliance as security: Passing a SOC 2 audit does not mean your database is secure. Compliance frameworks measure control existence, not runtime effectiveness. Map compliance requirements to continuous policies, not snapshot evidence.
- Neglecting network & transport security: Auditing queries without verifying TLS enforcement, VPC peering, or IP allowlisting leaves the database exposed to man-in-the-middle or lateral movement attacks. Integrate network telemetry with audit events to correlate access patterns with infrastructure posture.
- Failing to rotate audit credentials: Audit collection agents, Kafka producers, and SIEM integrators often use long-lived credentials. Rotate them automatically, enforce least privilege, and monitor for credential reuse across environments.
Best Practices from Production:
- Implement policy versioning with rollback capability. Broken rules can block legitimate traffic.
- Use cryptographic hash chains for audit log integrity. Verify hashes during compliance reviews.
- Run red-team simulations quarterly to validate that policies detect realistic attack paths.
- Separate read-only audit viewers from write-capable remediation roles. Principle of least privilege applies to auditors too.
- Document false positive thresholds and escalation paths. Engineering teams will disable pipelines that lack clear triage workflows.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Cloud-managed DB (RDS, Cloud SQL, Aurora) | Native extensions + managed log export to S3/Cloud Storage | Cloud providers restrict agent installation; native logs integrate with managed services | Low infrastructure cost, higher storage fees for retention |
| Self-hosted high-traffic OLTP | pgAudit/audit_log + Kafka pipeline + TypeScript engine | Decoupled architecture prevents I/O bottlenecks; scales with partitioned topics | Moderate compute for pipeline, high ROI via breach prevention |
| Compliance-heavy (HIPAA, PCI-DSS) | Immutable WORM storage + cryptographic log chaining + automated evidence generation | Satisfies chain-of-custody requirements; reduces auditor friction | Higher storage costs, lower audit preparation labor |
| Startup / low-traffic dev environments | Lightweight filebeat + local policy evaluation + Slack alerts | Minimizes operational overhead while maintaining visibility | Near-zero infrastructure cost, acceptable risk for non-prod |
Configuration Template
PostgreSQL pgAudit Configuration (postgresql.conf)
shared_preload_libraries = 'pgaudit'
pgaudit.log = 'read, write, ddl, role'
pgaudit.log_catalog = off
pgaudit.log_parameter = on
pgaudit.log_relation = on
pgaudit.log_level = warning
pgaudit.log_statement_once = off
log_destination = 'stderr'
logging_collector = on
log_directory = 'pg_audit_logs'
log_filename = 'audit-%Y-%m-%d.log'
log_rotation_age = 1d
log_rotation_size = 100MB
Filebeat Audit Shipper (filebeat.yml)
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/lib/postgresql/pg_audit_logs/audit-*.log
fields:
audit_source: postgresql
fields_under_root: true
multiline.type: pattern
multiline.pattern: '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}'
multiline.negate: true
multiline.match: after
output.kafka:
hosts: ["kafka-broker-1:9092", "kafka-broker-2:9092"]
topic: 'db-audit-events'
partition.round_robin:
reachable_only: true
compression: gzip
max_retries: 5
required_acks: 1
TypeScript Policy Engine Environment (.env)
KAFKA_BROKERS=kafka-broker-1:9092,kafka-broker-2:9092
AUDIT_TOPIC=db-audit-events
REMEDIATION_API_URL=https://internal-security-api.company.com/v1/quarantine
LOG_RETENTION_DAYS=365
COMPLIANCE_FRAMEWORK=HIPAA
Quick Start Guide
- Enable native auditing: Add
pgaudit or equivalent to your database configuration, set classification-based logging levels, and restart the instance.
- Deploy log shipper: Install
filebeat on the database host, configure multiline parsing for audit logs, and point output to your Kafka/PubSub topic.
- Launch policy engine: Clone the TypeScript audit processor, set environment variables, and run
npm run start. Verify event ingestion via Kafka consumer offset lag.
- Validate detection: Execute a test DDL statement or login from a non-VPC IP. Confirm alert appears in your SIEM/PagerDuty and that remediation actions trigger as configured.
- Lock storage: Configure S3/GCS bucket with Object Lock/WORM compliance mode, set retention policy, and verify cryptographic hash generation for log batches.