t key_arguments.
reroute_on_concurrency_control=True preserves all work at the cost of ~50% longer execution time due to queue cycling.
reroute_on_concurrency_control=False achieves the lowest latency by dropping redundant work, ideal for idempotent or latest-state-only operations.
registration_concurrency=KEYS prevents queue bloat by collapsing duplicate enqueues before worker assignment, optimizing resource utilization for high-frequency triggers.
Core Solution
The solution relies on Pynenc's built-in concurrency control engine, which tracks running invocations and their arguments in-process. By configuring running_concurrency=Mode.KEYS and specifying key_arguments, the orchestrator enforces one in-flight invocation per account key while allowing full parallelism across different accounts.
1. External Provider Simulation (API Server)
The mock API tracks in-flight requests per account and records collisions when overlapping calls occur.
# api_server.py β the part that matters
@app.post("/call/{account_id}/{op}")
async def call(account_id: str, op: str, hold: float = HOLD_SECONDS) -> dict[str, str]:
async with lock:
acc = accounts[account_id]
acc.calls += 1
collided = acc.in_flight > 0
acc.collisions += int(collided)
acc.in_flight += 1
print(f" [{'COLLISION' if collided else 'ok '}] {account_id:<8} {op}", flush=True)
await asyncio.sleep(hold)
async with lock:
accounts[account_id].in_flight -= 1
return {"outcome": "collision" if collided else "ok"}
2. Task Configuration & Concurrency Policies
The Pynenc app defines four distinct concurrency strategies using declarative decorators. Configuration (SQLite backend, thread runner, logging) lives alongside task definitions for cohesive architecture.
# tasks.py
import os
import httpx
from pynenc import PynencBuilder
from pynenc.conf.config_task import ConcurrencyControlType as Mode
API_URL = "http://127.0.0.1:8765"
app = (
PynencBuilder()
.app_id("concurrency_demo")
.sqlite("concurrency_demo.db")
.thread_runner(min_threads=1, max_threads=8)
.logging_stream("stdout")
.logging_level(os.environ.get("DEMO_LOG_LEVEL", "info"))
.max_pending_seconds(3.0)
.build()
)
def _hit(account_id: str, op: str, hold: float | None = None) -> str:
params = {"hold": hold} if hold is not None else None
r = httpx.post(f"{API_URL}/call/{account_id}/{op}", params=params, timeout=10.0)
r.raise_for_status()
return r.json()["outcome"]
@app.task
def call_unsafe(account_id: str, op: str) -> str:
return _hit(account_id, op)
@app.task(
running_concurrency=Mode.KEYS,
key_arguments=("account_id",),
reroute_on_concurrency_control=True,
)
def call_keyed(account_id: str, op: str) -> str:
return _hit(account_id, op)
@app.task(
running_concurrency=Mode.KEYS,
key_arguments=("account_id",),
reroute_on_concurrency_control=False,
)
def call_keyed_drop(account_id: str, op: str) -> str:
return _hit(account_id, op)
@app.task(
running_concurrency=Mode.KEYS,
registration_concurrency=Mode.KEYS,
key_arguments=("account_id",),
reroute_on_concurrency_control=True,
)
def refresh_once(account_id: str) -> str:
return _hit(account_id, "refresh")
3. Execution & Observability
The system supports both interactive exploration and CI/CD automation. The built-in monitor provides real-time invocation timelines showing REROUTED and CONCURRENCY_CONTROLLED_FINAL states.
# four terminals β recommended for exploring
uv run uvicorn api_server:app --port 8765 # 1. API
uv run pynenc --app tasks.app runner start # 2. worker
uv run pynenc monitor # 3. monitor (optional) at http://127.0.0.1:8000
uv run python enqueue.py all # 4. enqueue scenarios
# one command β recommended for CI
uv run python sample.py
Architecture Decisions:
- In-process state tracking eliminates external lock dependencies, reducing latency and operational blast radius.
- SQLite backend provides durable queue state without requiring Redis/PostgreSQL for demo/lightweight production workloads.
- Thread runner (
min_threads=1, max_threads=8) balances concurrency limits with OS thread overhead, scaling dynamically based on key availability.
Pitfall Guide
- Inconsistent Key Serialization:
key_arguments must serialize to identical values across invocations. If account_id is passed as different types (e.g., int vs str) or contains non-deterministic fields, the concurrency guard will fail to match keys, allowing parallel execution.
- Misconfigured
reroute_on_concurrency_control: Setting True in high-contention scenarios causes queue thrashing and increased latency. Setting False when business logic requires all operations results in silent task drops (CONCURRENCY_CONTROLLED_FINAL) and unhandled KeyError exceptions.
- Ignoring Registration vs Execution Concurrency: Failing to use
registration_concurrency=KEYS allows duplicate tasks to flood the queue before worker assignment. This wastes memory and CPU cycles on tasks that will ultimately be dropped or rerouted.
- Thread Pool Sizing Mismatch: Configuring
max_threads significantly higher than the number of active account keys leads to idle threads and context-switching overhead. Conversely, too few threads starve available keys, negating parallelism benefits.
- Assuming External Idempotency: Per-account concurrency control does not guarantee external system idempotency. If the provider lacks deduplication keys or optimistic locking, even serialized calls may cause state drift if retries occur without explicit request IDs.
- State Reconciliation Gaps in Drop Mode: When using
reroute=False, developers often forget to handle CONCURRENCY_CONTROLLED_FINAL outcomes. Failing to catch or log these states breaks downstream monitoring and alerting pipelines.
- Lock Service Migration Blind Spots: Teams migrating from Redis/etcd locks often attempt to replicate exact lock TTL semantics. Pynenc's concurrency control is stateful and event-driven, not time-based. Relying on TTLs for cleanup will cause deadlocks or stale key locks.
Deliverables
- π Concurrency Control Blueprint: Architecture diagram and decision matrix for selecting
reroute=True, reroute=False, or registration_concurrency=KEYS based on workload characteristics (idempotent vs. stateful, high vs. low contention).
- β
Pre-Deployment Checklist: Validation steps for key argument consistency, thread pool sizing, external API idempotency verification, and monitoring alert thresholds for
REROUTED/CONCURRENCY_CONTROLLED_FINAL states.
- βοΈ Configuration Templates: Production-ready
PynencBuilder setups for SQLite, PostgreSQL, and Redis backends, including environment variable mappings, logging configurations, and CI/CD integration scripts.
- π Observability Dashboard: Pre-configured Pynmon timeline filters and log parsers to track per-account serialization latency, collision rates, and queue churn in real-time.