r cancellation propagation, and composition strategies.
Step 1: Define the Producer Contract
The producer must return IAsyncEnumerable<T> and use yield return inside an async method. The compiler generates a state machine that respects await points and supports cooperative cancellation.
public async IAsyncEnumerable<TelemetryRecord> StreamTelemetryAsync(
string endpoint,
[EnumeratorCancellation] CancellationToken ct = default)
{
using var httpClient = new HttpClient();
var response = await httpClient.GetAsync(endpoint, HttpCompletionOption.ResponseHeadersRead, ct);
response.EnsureSuccessStatusCode();
using var stream = await response.Content.ReadAsStreamAsync(ct);
using var reader = new StreamReader(stream);
while (!reader.EndOfStream)
{
ct.ThrowIfCancellationRequested();
var line = await reader.ReadLineAsync(ct);
if (!string.IsNullOrWhiteSpace(line))
{
yield return JsonSerializer.Deserialize<TelemetryRecord>(line)!;
}
}
}
Key architectural decisions:
HttpCompletionOption.ResponseHeadersRead prevents buffering the entire response body.
[EnumeratorCancellation] binds the consumer's CancellationToken to the generated state machine.
yield return pauses execution until the consumer requests the next element, creating implicit backpressure.
Step 2: Implement the Consumer with await foreach
The consumer uses await foreach to iterate asynchronously. The compiler translates this into a while loop calling MoveNextAsync() and properly disposes the enumerator via IAsyncDisposable.
public async Task ProcessTelemetryAsync(
string endpoint,
CancellationToken ct = default)
{
await foreach (var record in StreamTelemetryAsync(endpoint, ct).WithCancellation(ct))
{
await ProcessRecordAsync(record, ct);
}
}
Note: .WithCancellation(ct) explicitly propagates cancellation. While await foreach respects cancellation tokens, chaining it explicitly ensures predictable behavior in composed pipelines.
Step 3: Compose Streams with LINQ Async
Synchronous LINQ operators materialize collections. For async streams, use System.Linq.Async (NuGet) which provides WhereAsync, SelectAsync, TakeAsync, and BufferAsync.
var filteredStream = StreamTelemetryAsync(endpoint, ct)
.WhereAsync(async r => await IsCriticalAsync(r, ct))
.SelectAsync(async r => await EnrichAsync(r, ct))
.BufferAsync(100); // Chunk for batch processing downstream
Step 4: Architecture Decisions and Rationale
- Pull-based vs Push-based: C# async streams are pull-based. The consumer controls the rate. This eliminates the need for explicit backpressure channels but requires consumers to process quickly enough to avoid producer buffering.
- Disposal Semantics:
await foreach automatically calls DisposeAsync() on the enumerator. Never manually dispose inside the loop unless breaking early.
- Error Boundaries: Exceptions thrown during enumeration bubble to the consumer. Wrap
await foreach in try/catch or use .CatchAsync() from System.Linq.Async for structured error handling.
- Buffering Strategy: Use
.BufferAsync(n) when downstream processors require batch efficiency (e.g., database bulk inserts) while preserving streaming semantics upstream.
Pitfall Guide
1. Materializing the Stream with .ToListAsync() or .ToArray()
Calling materialization operators defeats the entire purpose of async streams. The runtime allocates the full dataset in memory, nullifying O(1) memory benefits and introducing latency spikes. Use streaming operators exclusively unless explicit batch materialization is required downstream.
2. Ignoring Cancellation Propagation
Async streams that don't respect CancellationToken will continue consuming resources after the consumer aborts. This causes connection leaks, orphaned database cursors, and wasted CPU cycles. Always pass CancellationToken through the entire pipeline and use [EnumeratorCancellation] on producer parameters.
3. Blocking Async Streams with .Result or .Wait()
Synchronous blocking on async enumerators causes thread pool starvation and potential deadlocks in ASP.NET Core or WPF contexts. The CLR scheduler cannot reuse blocked threads, leading to queue saturation. Always use await or run blocking operations on dedicated threads via Task.Run if absolutely necessary.
4. Mixing Sync and Async Enumerators Incorrectly
Converting IAsyncEnumerable<T> to IEnumerable<T> via .ToBlockingEnumerable() forces synchronous iteration over async resources. This blocks threads on I/O and breaks cancellation semantics. Only use blocking conversions in console apps or background workers where thread pool impact is acceptable.
5. Assuming True Push-Based Backpressure
C# async streams are pull-based. If the consumer processes slowly, the producer continues generating elements until memory pressure forces GC. Unlike reactive streams (e.g., Rx.NET, Project Reactor), there's no built-in request n mechanism. Mitigate this by chunking (.BufferAsync), rate limiting, or implementing custom pacing logic in the consumer.
6. Improper Disposal of Async Enumerators
While await foreach handles disposal automatically, manual iteration using GetAsyncEnumerator() requires explicit await using or DisposeAsync() calls. Skipping disposal leaks underlying resources (HTTP connections, file handles, database readers). Always wrap manual enumeration in await using.
ConfigureAwait(false) is unnecessary in ASP.NET Core (no synchronization context) and can interfere with ambient context flow (e.g., IHttpContextAccessor, correlation IDs). Reserve it for library code targeting multiple hosts or high-throughput background services where context restoration adds measurable overhead.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Bulk database export (>1M rows) | IAsyncEnumerable<T> + .BufferAsync(500) | Maintains O(1) memory, enables chunked inserts | Reduces container memory by 80-90% |
| Real-time telemetry ingestion | IAsyncEnumerable<T> + .SelectAsync() | Processes elements as they arrive, prevents queue buildup | Lowers latency by 40-60% |
| API pagination with client filtering | IEnumerable<T> (sync) | Small bounded datasets, simpler client integration | No infra cost change |
| gRPC server streaming | IAsyncEnumerable<T> | Native protocol support, automatic backpressure | Eliminates thread pool saturation |
| Legacy sync codebase migration | .ToBlockingEnumerable() (temporary) | Phased migration path, minimal refactoring | Increases memory temporarily, plan async transition |
Configuration Template
Production-ready async stream pipeline with cancellation, retry, buffering, and structured logging:
using System.Runtime.CompilerServices;
using System.Text.Json;
using Polly;
using Polly.Retry;
public class AsyncStreamPipeline<T>
{
private readonly AsyncRetryPolicy _retryPolicy;
private readonly ILogger _logger;
public AsyncStreamPipeline(ILogger logger, int maxRetries = 3)
{
_logger = logger;
_retryPolicy = Policy
.Handle<HttpRequestException>()
.Or<IOException>()
.WaitAndRetryAsync(maxRetries, retry => TimeSpan.FromSeconds(Math.Pow(2, retry)));
}
public async IAsyncEnumerable<T> StreamWithResilienceAsync(
Func<CancellationToken, Task<IAsyncEnumerable<T>>> sourceFactory,
[EnumeratorCancellation] CancellationToken ct = default)
{
var attempts = 0;
while (!ct.IsCancellationRequested)
{
try
{
var source = await sourceFactory(ct);
await foreach (var item in source.WithCancellation(ct))
{
ct.ThrowIfCancellationRequested();
yield return item;
}
// Successful completion
yield break;
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
yield break;
}
catch (Exception ex)
{
attempts++;
_logger.LogWarning(ex, "Stream failure on attempt {Attempt}", attempts);
if (attempts >= _retryPolicy.PolicyState.MaxRetryAttempts)
throw;
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, attempts)), ct);
}
}
}
}
Quick Start Guide
- Install dependencies:
dotnet add package System.Linq.Async
- Create producer: Implement an
async IAsyncEnumerable<T> method with [EnumeratorCancellation] CancellationToken ct
- Implement consumer: Use
await foreach (var item in producer(ct).WithCancellation(ct)) inside a try/catch block
- Add composition: Chain
.WhereAsync(), .SelectAsync(), or .BufferAsync() from System.Linq.Async
- Validate: Run under load with
dotnet-counters or BenchmarkDotNet to confirm O(1) memory and stable thread pool queue depth