Skip to Content

Streaming Large Datasets with IAsyncEnumerable in MicroMediator

Loading everything into memory before processing is a habit. It works until it doesn't.

The moment your dataset grows beyond what fits comfortably in a single allocation, you pay for it through GC pressure, latency spikes, and occasionally an OutOfMemoryException at an inconvenient time. The fix is not to allocate more. It is to stop holding everything at once.

MicroMediator has first-class support for streaming via IAsyncEnumerable<T>. This post covers how it works, when to use it, and what the real numbers look like.


How streaming works in MicroMediator

Streaming uses a separate request type alongside the standard IRequest<T>.

// Standard request — loads everything, returns it all
public record GetProductsQuery : IRequest<List<Product>>;

// Streaming request — yields one item at a time
public record StreamProductsQuery : IStreamRequest<Product>;

The handler implements IStreamRequestHandler<TRequest, TResponse> and returns IAsyncEnumerable<T>:

public class StreamProductsHandler 
    : IStreamRequestHandler<StreamProductsQuery, Product>
{
    private readonly IProductRepository _repository;

    public StreamProductsHandler(IProductRepository repository)
    {
        _repository = repository;
    }

    public async IAsyncEnumerable<Product> HandleAsync(
        StreamProductsQuery request,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        await foreach (var product in _repository.StreamAllAsync(cancellationToken))
        {
            yield return product;
        }
    }
}

Register it alongside your standard handlers:

builder.Services
    .AddMediator()
    .AddSingletonStreamHandler<StreamProductsHandler>()
    .AddScopedHandler<CreateOrderCommandHandler>();

Consume it with await foreach:

await foreach (var product in _mediator.StreamAsync(new StreamProductsQuery()))
{
    // Process one product at a time
    await ProcessProduct(product);
}

The lifetime model is the same as standard handlers. If your streaming handler depends on DbContext, use AddScopedStreamHandler. If it is stateless, use AddSingletonStreamHandler. The handler lifetimes post covers the reasoning in detail.


When streaming makes sense

Streaming is not always the right choice. It adds complexity and in some scenarios runs slower than loading everything at once.

Use streaming when:

  • You are processing more items than you need to hold in memory simultaneously
  • You want early exit — processing stops as soon as a condition is met, no wasted work
  • Your data source already supports streaming (EF Core's AsAsyncEnumerable(), Azure Blob Storage, etc.)
  • You are aggregating or transforming, not collecting

Load everything when:

  • You need random access to items by index
  • The result set is small and bounded
  • You need to sort or group the entire dataset before processing

What the benchmarks actually show

All benchmarks run on .NET 10.0, Intel Core Ultra 7 165H, BenchmarkDotNet v0.15.8. The product streaming benchmarks use realistic objects with multiple string fields, a decimal price, and an integer stock quantity — not trivial integers.

Streaming vs loading: the full picture

MethodItemsMeanAllocated
LoadAll10055.87 μs29.98 KB
Stream to list10080.77 μs30.42 KB
Stream process10080.04 μs29.60 KB
Stream take first 5010047.81 μs15.13 KB
LoadAll1,000455.43 μs297.17 KB
Stream to list1,000461.48 μs297.61 KB
Stream process1,000471.45 μs289.76 KB
Stream take first 501,00047.06 μs15.13 KB
LoadAll5,0002,098 μs1,484.67 KB
Stream to list5,0002,269 μs1,485.11 KB
Stream process5,0002,318 μs1,446.01 KB
Stream take first 505,00032.73 μs15.13 KB

A few things stand out here that are worth being direct about.

Streaming to a list is slower than loading everything. The IAsyncEnumerable overhead adds roughly 8-10% when you collect every item anyway. If your goal is to load all results and return them, streaming is the wrong tool.

Streaming to process items is also slightly slower than load-all at small counts, but the memory profile improves. At 1,000 items, Stream_Process uses 289KB versus LoadAll at 297KB. At 5,000 items, processing with a stream uses 1,446KB versus 1,485KB loaded. The gap is small because the product objects themselves are what takes memory, not the collection structure.

The number that matters is Stream take first 50.

At 5,000 items it runs in 32.73μs and allocates 15.13KB. LoadAll takes 2,098μs and allocates 1,484KB. That is 64x faster and 98% less memory, regardless of total dataset size.

This is the streaming case worth designing for: you do not need all the items. You need the first N that match a condition.

Early exit at scale

The stress benchmarks push this further with 100,000 and 1,000,000 item streams.

MethodStream sizeMeanAllocated
Early exit (after 1,000)100,00031.34 μs728 B
Early exit (after 1,000)1,000,00032.54 μs728 B
Complete processing100,0002,753 μs784 B
Complete processing1,000,00029,132 μs784 B

Early exit at 100,000 items takes the same time as early exit at 1,000,000 items: around 32μs. The stream stops the moment you break out of the loop. Total dataset size is irrelevant to cost when you exit early.

Memory stays constant regardless of stream size. 728 bytes to process the first 1,000 items from a 1,000,000 item stream.


Search results are a common case. You rarely need all matching products — you need the first page.

public record SearchProductsQuery : IStreamRequest<Product>
{
    public string SearchTerm { get; init; } = string.Empty;
    public int PageSize { get; init; } = 20;
}

public class SearchProductsHandler 
    : IStreamRequestHandler<SearchProductsQuery, Product>
{
    private readonly IProductRepository _repository;

    public SearchProductsHandler(IProductRepository repository)
    {
        _repository = repository;
    }

    public async IAsyncEnumerable<Product> HandleAsync(
        SearchProductsQuery request,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        await foreach (var product in _repository.StreamByTermAsync(
            request.SearchTerm, cancellationToken))
        {
            yield return product;
        }
    }
}

// In your controller or service
var results = new List<Product>();
await foreach (var product in _mediator.StreamAsync(
    new SearchProductsQuery { SearchTerm = "laptop", PageSize = 20 }))
{
    results.Add(product);
    if (results.Count >= 20) break;
}

The handler streams everything it finds. The consumer exits after 20 items. No loading a full result set. No wasted work beyond item 20.

LINQ's Take works too if you prefer:

var results = await _mediator
    .StreamAsync(new SearchProductsQuery { SearchTerm = "laptop" })
    .Take(20)
    .ToListAsync();

Cancellation

Streaming respects CancellationToken throughout. Pass it to StreamAsync and check it in your handler.

await foreach (var product in _mediator.StreamAsync(query, cancellationToken))
{
    await ProcessProduct(product);
}

The [EnumeratorCancellation] attribute on the handler parameter ensures the token propagates correctly through the IAsyncEnumerable state machine. The stress benchmarks include a cancellation scenario: cancelling at 50,000 items from a 1,000,000 item stream costs 1,666μs and 3,760 bytes. Clean and predictable.


What comes next

The next post looks at when to choose a lightweight mediator, covering where different libraries makes different trade-offs and when those trade-offs matter.

The package is available on NuGet: TechnicalDogsbody.MicroMediator. Source and benchmarks are on GitHub.

Andy Blyth

Andy Blyth, an Optimizely MVP (OMVP) and Technical Architect at 26 DX with a keen interest in martial arts, occasionally ventures into blogging when memory serves.

optimizely-mvp-technology

SaaS CMS Cert

contentful-certified-professional

Andy Blyth