ilStr
from typing import Optional
import httpx
import logging
import os
app = FastAPI(title="Automation Router")
logger = logging.getLogger("automation_engine")
class InboundPayload(BaseModel):
event_type: str
source_id: str
metadata: dict
recipient_email: Optional[EmailStr] = None
@app.post("/api/v1/webhook/ingest")
async def receive_trigger(payload: InboundPayload, bg_tasks: BackgroundTasks):
logger.info(f"Received trigger: {payload.event_type} | Source: {payload.source_id}")
# Offload heavy I/O to background tasks to keep the webhook response fast
bg_tasks.add_task(process_and_dispatch, payload)
return {"status": "accepted", "event_id": payload.source_id}
**Why this choice:** Webhook providers often retry deliveries if the endpoint doesn't respond within 5β10 seconds. Accepting the payload immediately and deferring processing prevents timeout errors. Pydantic validation fails fast on malformed JSON, returning a 422 before any downstream API is called.
### Step 2: Implement Idempotent Dispatch Logic
Network retries and platform redeliveries cause duplicate processing. An idempotency layer ensures each event is handled exactly once.
```python
import hashlib
import json
from datetime import datetime, timezone
class EventStore:
def __init__(self):
self._processed_ids: set[str] = set()
def is_duplicate(self, payload: InboundPayload) -> bool:
raw = json.dumps(payload.model_dump(exclude={"metadata"}), sort_keys=True)
event_hash = hashlib.sha256(raw.encode()).hexdigest()
if event_hash in self._processed_ids:
return True
self._processed_ids.add(event_hash)
return False
store = EventStore()
async def process_and_dispatch(payload: InboundPayload):
if store.is_duplicate(payload):
logger.warning(f"Duplicate event skipped: {payload.source_id}")
return
transformed = await transform_payload(payload)
await dispatch_to_targets(transformed)
Why this choice: Hashing the deterministic fields of the payload creates a lightweight idempotency key. In production, replace the in-memory set with Redis or a database table to survive restarts. This prevents duplicate Slack messages, spreadsheet rows, or CRM entries.
Step 3: Async Multi-Target Routing
Instead of sequential blocking calls, dispatch to multiple destinations concurrently.
async def dispatch_to_targets(data: dict):
targets = [
("https://hooks.slack.com/services/PLACEHOLDER", {"text": data["summary"]}),
("https://api.airtable.com/v0/PLACEHOLDER/Leads", {"fields": data["crm_fields"]}),
]
async with httpx.AsyncClient(timeout=10.0) as client:
tasks = [
client.post(url, json=payload, headers={"Authorization": f"Bearer {os.getenv('API_KEY')}"})
for url, payload in targets
]
responses = await httpx.AsyncClient().acquire() # Placeholder for proper gather
# In production: use asyncio.gather(*tasks, return_exceptions=True)
Why this choice: httpx.AsyncClient multiplexes connections. If one target fails, others still succeed. Wrapping dispatch in asyncio.gather with return_exceptions=True ensures a single downstream outage doesn't crash the entire pipeline.
Step 4: Deterministic Scheduling Replacement
Cron-style jobs require timezone awareness and graceful shutdown handling.
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = AsyncIOScheduler(timezone="UTC")
@scheduler.scheduled_job(CronTrigger(hour=9, minute=0))
async def daily_metrics_rollup():
logger.info("Starting daily metrics aggregation...")
async with httpx.AsyncClient() as client:
resp = await client.get(
"https://internal-api.example.com/v1/metrics/today",
headers={"X-Service-Token": os.getenv("INTERNAL_TOKEN")}
)
resp.raise_for_status()
payload = resp.json()
await send_summary_email(payload)
scheduler.start()
Why this choice: apscheduler integrates cleanly with async event loops and supports persistent job stores. Hardcoding UTC prevents daylight saving drift. The scheduler runs inside the same process as the web server, eliminating the need for separate cron daemons or external task runners.
Pitfall Guide
1. Silent Webhook Timeouts
Explanation: Processing logic runs synchronously in the request handler. If downstream APIs are slow, the webhook provider retries, causing duplicate executions.
Fix: Always return a 202 Accepted immediately. Offload processing to background tasks or a message queue. Implement idempotency keys to safely handle retries.
2. Unbounded Retry Storms
Explanation: Naive retry loops on failed HTTP requests flood rate-limited APIs, triggering temporary bans.
Fix: Implement exponential backoff with jitter. Use httpx with a retry transport or wrap calls in a decorator that respects Retry-After headers. Cap retries at 3β5 attempts before logging to a dead-letter channel.
3. Timezone Drift in Scheduled Jobs
Explanation: Using local server time for cron triggers causes jobs to fire twice or skip entirely during DST transitions.
Fix: Force all schedulers and timestamps to UTC. Convert to local time only at the presentation layer (e.g., email templates or dashboard UIs).
4. Blocking I/O in Async Contexts
Explanation: Mixing synchronous libraries (like requests or imaplib) inside async def functions blocks the event loop, degrading throughput for concurrent webhook deliveries.
Fix: Use async-native alternatives (httpx, aiosmtplib, aiogspread). If legacy sync libraries are unavoidable, run them in asyncio.to_thread() to isolate blocking calls.
5. Hardcoded Secrets and Configuration
Explanation: Embedding API keys, webhook URLs, or database credentials in source code creates rotation blind spots and security vulnerabilities.
Fix: Load configuration from environment variables or a secret manager (AWS Secrets Manager, HashiCorp Vault, Doppler). Validate required keys at startup and fail fast if missing.
6. Missing Schema Evolution Handling
Explanation: Upstream APIs change field names or drop optional properties. Rigid parsing crashes the pipeline.
Fix: Use Pydantic models with .model_validate() and explicit Optional types. Log schema mismatches as warnings rather than errors, and implement fallback defaults for non-critical fields.
7. Unmanaged Log Growth
Explanation: Printing raw payloads or verbose debug output fills disk space and degrades I/O performance over time.
Fix: Use structured JSON logging (structlog or python-json-logger). Route logs to stdout for container orchestration. Implement log rotation and set appropriate log levels per environment.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| < 500 ops/month, non-technical team | SaaS Workflow Platform | Zero code, visual debugging, rapid prototyping | $20β$30/mo subscription |
| 500β50k ops/month, predictable schema | Custom Script Engine | Full control, zero marginal cost, native validation | $0 (free-tier hosting) |
| > 50k ops/month, high concurrency | Serverless Functions + Queue | Auto-scaling, isolated failures, pay-per-execution | $0β$5/mo (cloud free tiers) |
| Complex data transformation, ML pipelines | Dedicated Worker + Database | Stateful processing, versioned artifacts, audit trails | $10β$30/mo (managed DB + compute) |
Configuration Template
# docker-compose.yml
version: "3.9"
services:
automation-engine:
build: .
ports:
- "8000:8000"
environment:
- SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL}
- AIRTABLE_API_KEY=${AIRTABLE_API_KEY}
- INTERNAL_API_TOKEN=${INTERNAL_API_TOKEN}
- LOG_LEVEL=INFO
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 5s
retries: 3
# pyproject.toml
[project]
name = "automation-engine"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.109.0",
"uvicorn[standard]>=0.27.0",
"httpx>=0.27.0",
"pydantic[email]>=2.6.0",
"apscheduler>=3.10.4",
"structlog>=24.1.0",
"python-dotenv>=1.0.0"
]
# main.py (entrypoint)
import structlog
from fastapi import FastAPI
from dotenv import load_dotenv
load_dotenv()
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer()
]
)
app = FastAPI(title="Automation Engine", version="1.0.0")
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": structlog.get_contextvars().get("ts")}
Quick Start Guide
- Initialize the project: Run
uv init automation-engine && cd automation-engine && uv add fastapi uvicorn httpx pydantic apscheduler structlog python-dotenv.
- Create environment file: Copy
.env.example to .env and populate SLACK_WEBHOOK_URL, AIRTABLE_API_KEY, and INTERNAL_API_TOKEN.
- Start the engine: Execute
uvicorn main:app --host 0.0.0.0 --port 8000 --reload. Verify the /health endpoint returns {"status": "healthy"}.
- Test webhook ingestion: Send a POST request to
/api/v1/webhook/ingest with a valid JSON payload. Check console output for structured logs and confirm background task execution.
- Deploy to free-tier host: Build a Docker image (
docker build -t automation-engine .), push to a container registry, and deploy via Oracle Cloud Free Tier, GitHub Actions, or PythonAnywhere. Configure environment variables in the hosting dashboard and enable automatic restarts.