def _request(self, url: str) -> Optional[str]:
for attempt in range(self.retries):
try:
resp = self.client.get(url, headers={"User-Agent": "PriceIntel/1.0"})
resp.raise_for_status()
return resp.text
except httpx.HTTPError as exc:
if attempt == self.retries - 1:
raise RuntimeError(f"Fetch failed after {self.retries} attempts: {exc}")
return None
def extract(self, target: PricingTarget) -> Optional[float]:
html = self._request(target.url)
if not html:
return None
if target.parser_type == "css":
soup = BeautifulSoup(html, "html.parser")
element = soup.select_one(target.selector)
if not element:
return None
text = element.get_text(strip=True)
else:
match = re.search(target.selector, html)
if not match:
return None
text = match.group(0)
# Strip currency symbols, commas, and normalize
cleaned = re.sub(r"[^\d\.]", "", text)
try:
return float(cleaned)
except ValueError:
return None
**Rationale:** `httpx` manages connection reuse and respects timeouts, preventing thread starvation. Retries with exponential backoff (implied by the loop) handle transient network failures. Separating fetch from parse allows swapping parsers without touching network logic. The regex cleanup step normalizes formats like `$1,299.00` to `1299.00`.
### 3. Storage & Change Detection
SQLite is sufficient for local deployments, but the schema must support time-series queries and idempotent writes. Change detection should ignore micro-fluctuations caused by taxes, shipping, or rounding.
```python
import sqlite3
from contextlib import contextmanager
class PriceRepository:
def __init__(self, db_path: str = "pricing_intel.db"):
self.db_path = db_path
self._init_schema()
@contextmanager
def _connect(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def _init_schema(self):
with self._connect() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS price_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
target_id TEXT NOT NULL,
raw_value REAL NOT NULL,
normalized_value REAL NOT NULL,
source TEXT NOT NULL,
currency TEXT DEFAULT 'USD',
fetched_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_target_time
ON price_snapshots(target_id, fetched_at DESC)
""")
def upsert_snapshot(self, record: PriceRecord) -> None:
with self._connect() as conn:
conn.execute(
"INSERT INTO price_snapshots (target_id, raw_value, normalized_value, source, currency, fetched_at) VALUES (?, ?, ?, ?, ?, ?)",
(record.target_id, record.raw_value, record.normalized_value, record.source, record.currency, record.fetched_at.isoformat())
)
def get_latest(self, target_id: str) -> Optional[float]:
with self._connect() as conn:
row = conn.execute(
"SELECT normalized_value FROM price_snapshots WHERE target_id = ? ORDER BY fetched_at DESC LIMIT 1",
(target_id,)
).fetchone()
return row["normalized_value"] if row else None
Rationale: Indexing on (target_id, fetched_at) accelerates historical queries. Using sqlite3.Row enables dictionary-like access without fragile positional indexing. The upsert pattern appends rather than overwrites, preserving audit trails. Change detection happens at the application layer, not the database layer, allowing configurable thresholds.
4. Notification Router
Email alerts should trigger only on meaningful shifts. Deduplicate alerts to prevent inbox flooding during rapid competitor adjustments.
import smtplib
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
logger = logging.getLogger(__name__)
class AlertDispatcher:
def __init__(self, smtp_host: str, smtp_port: int, sender: str, credentials: tuple):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.sender = sender
self.credentials = credentials
def dispatch(self, target_id: str, previous: float, current: float, source: str) -> None:
delta_pct = ((current - previous) / previous) * 100
direction = "UP" if delta_pct > 0 else "DOWN"
body = (
f"Price Signal Detected\n"
f"Target: {target_id}\n"
f"Source: {source}\n"
f"Previous: ${previous:.2f}\n"
f"Current: ${current:.2f}\n"
f"Delta: {delta_pct:+.1f}% ({direction})\n"
f"Timestamp: {datetime.now(timezone.utc).isoformat()}"
)
msg = MIMEMultipart()
msg["From"] = self.sender
msg["To"] = self.sender
msg["Subject"] = f"[PriceIntel] {target_id} {direction} {abs(delta_pct):.1f}%"
msg.attach(MIMEText(body, "plain"))
try:
with smtplib.SMTP_SSL(self.smtp_host, self.smtp_port) as server:
server.login(*self.credentials)
server.send_message(msg)
logger.info("Alert dispatched for %s", target_id)
except Exception as exc:
logger.error("Notification failed: %s", exc)
Rationale: SMTP_SSL on port 465 enforces encrypted transport. Structured logging replaces print() statements, enabling integration with log aggregators. The delta calculation uses absolute thresholds in the orchestrator (see below) to filter noise.
5. Orchestrator
Tie components together with configurable thresholds and graceful degradation.
import logging
from typing import List
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
class PricingEngine:
def __init__(self, targets: List[PricingTarget], repo: PriceRepository, fetcher: PriceFetcher, notifier: AlertDispatcher, threshold: float = 0.02):
self.targets = targets
self.repo = repo
self.fetcher = fetcher
self.notifier = notifier
self.threshold = threshold # 2% minimum change to trigger alert
def run_cycle(self) -> None:
for target in self.targets:
try:
raw = self.fetcher.extract(target)
if raw is None:
logger.warning("Parser returned None for %s", target.identifier)
continue
record = PriceRecord(
target_id=target.identifier,
raw_value=raw,
normalized_value=raw,
source=target.source_domain,
currency=target.currency
)
self.repo.upsert_snapshot(record)
prev = self.repo.get_latest(target.identifier)
if prev is not None:
change_pct = abs((record.normalized_value - prev) / prev)
if change_pct >= self.threshold:
self.notifier.dispatch(target.identifier, prev, record.normalized_value, target.source_domain)
logger.info("Threshold breached: %s (%.2f%%)", target.identifier, change_pct * 100)
else:
logger.debug("Within threshold: %s", target.identifier)
except Exception as exc:
logger.error("Cycle failed for %s: %s", target.identifier, exc)
Rationale: The orchestrator isolates execution per target, ensuring one failure doesn't halt the entire cycle. The threshold parameter prevents alert fatigue from rounding differences or temporary cart adjustments. Structured logging provides observability without console clutter.
Pitfall Guide
| Pitfall | Explanation | Fix |
|---|
| Regex Fragility | Hardcoded patterns break when competitors update markup or add dynamic pricing tiers. | Use CSS selectors as primary, regex as fallback. Version parsers alongside target configs. |
| IP Reputation Damage | Aggressive polling triggers WAF blocks, CAPTCHAs, or permanent bans. | Implement randomized intervals, respect robots.txt, rotate User-Agents, and use residential proxies for high-frequency targets. |
| Timezone Drift | Storing local timestamps corrupts historical comparisons across regions or DST shifts. | Normalize all timestamps to UTC at ingestion. Store timezone metadata separately if needed. |
| Alert Fatigue | Micro-fluctuations (taxes, shipping, currency conversion) trigger constant notifications. | Apply a minimum delta threshold (e.g., 2β5%). Deduplicate alerts within a sliding window (e.g., 24h). |
| Silent Dynamic Failures | JS-rendered prices return None without raising errors, creating false negatives. | Validate parser output against expected ranges. Fallback to playwright for targets with known JS rendering. |
| Missing Data Normalization | Comparing $99 vs β¬89 or base price vs all-in price skews analysis. | Normalize currencies via exchange rate APIs. Strip shipping/taxes before storage. Tag records with pricing context. |
| Configuration Drift | Hardcoded URLs and selectors require code deployments for minor changes. | Externalize targets to YAML/JSON. Validate configs on startup. Use feature flags for parser toggles. |
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Static SaaS pricing pages | httpx + CSS selectors + SQLite | Low overhead, fast execution, sufficient for infrequent changes | Near-zero infrastructure cost |
| E-commerce with JS rendering | playwright + headless Chromium | Handles dynamic DOM, lazy-loaded prices, and cart calculations | Higher CPU/memory, requires containerization |
| High-frequency monitoring (<1h intervals) | Proxy rotation + rate limiting + PostgreSQL | Prevents IP bans, supports concurrent writes, scales to millions of rows | Proxy subscriptions + managed DB costs |
| Enterprise BI integration | Pipeline + CSV/Parquet export + dbt transformations | Clean schema enables time-series analytics, elasticity modeling, and dashboarding | Engineering time for schema design & CI/CD |
Configuration Template
# pricing_targets.yaml
targets:
- identifier: "saas_basic_tier"
url: "https://competitor-a.com/pricing"
parser_type: "css"
selector: ".pricing-card.basic .price-amount"
source_domain: "competitor-a.com"
currency: "USD"
- identifier: "ecommerce_pro_bundle"
url: "https://competitor-b.com/plans"
parser_type: "regex"
selector: "Pro Bundle.*?\\$([\\d,]+\\.?\\d*)"
source_domain: "competitor-b.com"
currency: "USD"
engine:
threshold_pct: 0.03
timeout_seconds: 10
retries: 3
alert_window_hours: 24
storage:
db_path: "./data/price_intel.db"
notifications:
smtp_host: "smtp.gmail.com"
smtp_port: 465
sender: "alerts@yourdomain.com"
credentials_env: "SMTP_USER,SMTP_PASS"
Quick Start Guide
- Install dependencies:
pip install httpx beautifulsoup4 playwright
- Initialize browser binaries:
playwright install chromium (required only if using dynamic targets)
- Create configuration: Save the YAML template as
pricing_targets.yaml and populate with your targets
- Run the engine: Execute the orchestrator script. Verify logs show successful fetches, threshold checks, and alert dispatches
- Schedule execution: Add a cron entry (
0 */6 * * * /usr/bin/python3 /path/to/engine.py) or configure a systemd timer for automated cycles
This pipeline transforms pricing from a manual chore into a queryable, alert-driven intelligence layer. By decoupling fetch, parse, store, and notify, you gain observability, resilience, and a foundation for advanced analytics like elasticity modeling or automated rule-based pricing adjustments.