Polluxs.ForEach 0.1.2

dotnet add package Polluxs.ForEach --version 0.1.2
                    
NuGet\Install-Package Polluxs.ForEach -Version 0.1.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Polluxs.ForEach" Version="0.1.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Polluxs.ForEach" Version="0.1.2" />
                    
Directory.Packages.props
<PackageReference Include="Polluxs.ForEach" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Polluxs.ForEach --version 0.1.2
                    
#r "nuget: Polluxs.ForEach, 0.1.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Polluxs.ForEach@0.1.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Polluxs.ForEach&version=0.1.2
                    
Install as a Cake Addin
#tool nuget:?package=Polluxs.ForEach&version=0.1.2
                    
Install as a Cake Tool

<div align="center"> <img src="ForEach/logo.png" alt="ForEach Logo" width="150"/> </div>

ForEach

Make .NET concurrency simple.

Extension methods for parallel processing with fluent syntax. Built on Parallel.ForEachAsync with extras like result collection and per-key limits.

⚠️ Warning: This package is in active development and may introduce breaking changes between versions.

Quick Examples

Parallel processing with concurrency limit

// Process 100 URLs, max 20 at a time
await urls.ForEachParallelAsync(async (url, ct) =>
{
    var response = await httpClient.GetAsync(url, ct);
    Console.WriteLine($"{url}: {response.StatusCode}");
}, maxParallel: 20);

Collect results from parallel operations

// Download and collect all results
var results = await urls.ForEachParallelAsync(async (url, ct) =>
{
    var content = await httpClient.GetStringAsync(url, ct);
    return (url, content.Length);
}, maxParallel: 20);

foreach (var (url, size) in results)
    Console.WriteLine($"{url}: {size} bytes");

Per-key concurrency limits

// Max 50 total, but only 2 per customer
await orders.ForEachKeyParallelAsync(
    keySelector: order => order.CustomerId,
    body: async (order, ct) => await ProcessOrderAsync(order, ct),
    maxTotalParallel: 50,
    maxPerKey: 2);

Batch processing with parallel execution

// Process items in batches of 50, with max 5 batches running concurrently
await records.ForEachBatchParallelAsync(async (batch, ct) =>
{
    await database.BulkInsertAsync(batch, ct);
}, maxPerBatch: 50, maxConcurrent: 5);

All Methods

For IEnumerable<T>, IAsyncEnumerable<T>, and Channel<T>:

Method Purpose
ForEachParallelAsync Process items concurrently with a global limit
ForEachParallelAsync<T,TResult> Process items concurrently and collect results
ForEachKeyParallelAsync Process items with both global and per-key concurrency limits
ForEachBatchParallelAsync Process items in batches with parallel batch execution

For Channel<T> only:

Method Purpose
ForEachAsync Process items sequentially
ReadAllAsync Convert channel to IAsyncEnumerable<T>
WriteAllAsync Write items from IEnumerable<T> or IAsyncEnumerable<T> into channel

Cancellation Support

All methods support cancellation tokens at two levels:

1. Method-level cancellation (always available):

var cts = new CancellationTokenSource();
await items.ForEachParallelAsync(async item =>
{
    await ProcessAsync(item);
}, maxParallel: 10, ct: cts.Token);  // ← Pass CT here

2. Body-level cancellation (optional - use when your work needs it):

await items.ForEachParallelAsync(async (item, ct) =>  // ← CT parameter
{
    await ProcessAsync(item, ct);  // ← Pass to operations
}, maxParallel: 10);

Don't need cancellation? Just omit it:

// Simplest form - no cancellation token needed
await items.ForEachParallelAsync(async item =>
{
    await ProcessAsync(item);
}, maxParallel: 10);

Code examples for every method

ForEachParallelAsync

Run async operations for an enumerable with a concurrency limit.

using ForEach.Enumerable; // or ForEach.AsyncEnumerable

await files.ForEachParallelAsync(async (path, ct) =>
{
    var content = await File.ReadAllTextAsync(path, ct);
    var upper = content.ToUpperInvariant();
    await File.WriteAllTextAsync($"{path}.out", upper, ct);
}, maxParallel: 8);
  • Global cap via maxParallel
  • Honors cancellation
  • Exception behavior:
    • IEnumerable<T>: Aggregates via Parallel.ForEachAsyncAggregateException
    • IAsyncEnumerable<T>: Aggregates via Task.WhenAllAggregateException

ForEachParallelAsync (with results)

Process items concurrently and collect results.

var results = await urls.ForEachParallelAsync(async (url, ct) =>
{
    using var r = await httpClient.GetAsync(url, ct);
    return (url, r.StatusCode);
}, maxParallel: 16);

foreach (var (url, code) in results)
    Console.WriteLine($"{url} → {code}");
  • Output order is arbitrary (not guaranteed)
  • Works with any return type
  • Uses ConcurrentBag under the hood
  • Exception aggregation same as ForEachParallelAsync (inherits from Parallel.ForEachAsync)

ForEachKeyParallelAsync

Limit concurrency globally AND per key.

await jobs.ForEachKeyParallelAsync(
    keySelector: j => j.AccountId,
    body: async (job, ct) =>
    {
        await HandleJobAsync(job, ct);
    },
    maxTotalParallel: 64,
    maxPerKey: 2);
  • Global cap = maxTotalParallel (actual items being processed concurrently)
  • Per-key cap = maxPerKey (items per key being processed concurrently)
  • Effective per-key limit = min(maxTotalParallel, maxPerKey) - if maxPerKey > maxTotalParallel, the global limit wins
  • Uses bounded channel + per-key semaphores for efficient throttling
  • Enumerates source only once (no materialization required)
  • Aggregates exceptions via Task.WhenAll - multiple failures collected into an AggregateException

ForEachBatchParallelAsync

Process items in batches with concurrent batch execution.

using ForEach.Enumerable; // or ForEach.AsyncEnumerable

// Bulk insert records in batches
await records.ForEachBatchParallelAsync(async (batch, ct) =>
{
    await database.BulkInsertAsync(batch, ct);
    Console.WriteLine($"Inserted batch of {batch.Count} records");
}, maxPerBatch: 100, maxConcurrent: 4);
  • Items grouped into batches of up to maxPerBatch items
  • Up to maxConcurrent batches processed in parallel
  • Last batch may contain fewer items
  • Each batch provided as List<T> to body function
  • Useful for database bulk operations, API batch calls, or any scenario where processing N items together is more efficient
  • Uses bounded channel + producer-consumer pattern
  • Aggregates exceptions via Task.WhenAllAggregateException

ForEachAsync

Process channel items sequentially.

await channel.ForEachAsync(async (item, ct) =>
    await ProcessAsync(item, ct));

ReadAllAsync

Convert channel to IAsyncEnumerable<T>.

await foreach (var item in channel.ReadAllAsync())
    Process(item);

WriteAllAsync

Write items into channel.

var channel = Channel.CreateUnbounded<int>();

// From IEnumerable<T>
await channel.WriteAllAsync(Enumerable.Range(1, 100));

// From IAsyncEnumerable<T>
await channel.WriteAllAsync(FetchDataAsync());

channel.Writer.Complete();

License

MIT = copy, use, modify, ignore.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
  • net8.0

    • No dependencies.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.1.2 131 11/8/2025
0.1.1 131 11/8/2025
0.1.0 132 11/8/2025
0.0.3 140 11/8/2025
0.0.2 122 11/7/2025
0.0.1 111 11/7/2025