ParallelFileProcessor 0.1.0

dotnet add package ParallelFileProcessor --version 0.1.0
                    
NuGet\Install-Package ParallelFileProcessor -Version 0.1.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="ParallelFileProcessor" Version="0.1.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="ParallelFileProcessor" Version="0.1.0" />
                    
Directory.Packages.props
<PackageReference Include="ParallelFileProcessor" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add ParallelFileProcessor --version 0.1.0
                    
#r "nuget: ParallelFileProcessor, 0.1.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package ParallelFileProcessor@0.1.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=ParallelFileProcessor&version=0.1.0
                    
Install as a Cake Addin
#tool nuget:?package=ParallelFileProcessor&version=0.1.0
                    
Install as a Cake Tool

ParallelFileProcessor

High?performance, allocation�aware, parallel line processor for very large text files.

Processes a file by splitting it into byte chunks (aligned so multi?byte characters are not broken) and spinning up multiple tasks that each parse lines (LF separated, optional CR) directly from pooled byte buffers. Supports UTF?8, UTF?16 LE/BE, UTF?32 LE/BE via BOM detection (or explicit encoding). No per?line string allocations unless you create them yourself.

Features

  • Parallel chunk processing with bounded task count
  • Zero copy line slices (Span<byte>) � caller decides if/when to decode
  • BOM detection (UTF?8 / UTF?16 LE / UTF?16 BE / UTF?32 LE / UTF?32 BE)
  • Optional user context per task (created via async factory)
  • Optional finalize callback per task (async) for aggregation / flushing
  • Adaptive (or user supplied) chunk sizing; always 4?byte aligned
  • Manual control to stop early by returning false from the line callback
  • Uses ArrayPool<byte> to minimize GC pressure

Core Type

ParallelTextFileProcessor with main API:

Task<ProcessResult> ProcessAsync<T>(
    string filePath,
    Func<int, Span<byte>, T?, bool> processLine,
    Func<Task<T>>? contextFactory = null,
    Func<T, Task>? finalizeTask = null,
    Encoding? encoding = null,
    int numTasks = -1,
    int chunkSize = -1) where T : class

ProcessResult fields:

  • ParallelTasks: actual parallelism used
  • TotalTasks: total chunks scheduled (can exceed parallelism)
  • ChunkSize: bytes per chunk (4?byte aligned)
  • TotalLines: total processed lines
  • TotalBytes: file size

When To Use

  • Log analytics over multi?GB log files
  • ETL pre?processing (filtering, counting, bucketing)
  • Building indexes / statistics from raw text
  • Fast scanning for patterns before deeper parsing

Quick Start

# (Assuming .NET 10 preview toolchain installed)
dotnet add package (not published yet) # currently local source only

Minimal Example (count lines containing an error token)

using ParallelFileProcessor;
using System.Text;

var targetFile = @"/data/huge.log";
var needle = Encoding.UTF8.GetBytes("ERROR");
long errorCount = 0;

var result = await ParallelTextFileProcessor.ProcessAsync<object>(
    filePath: targetFile,
    processLine: (taskId, lineBytes, _ctx) =>
    {
        // simple contains check on raw bytes
        if (lineBytes.IndexOf(needle) >= 0)
        {
            Interlocked.Increment(ref errorCount);
        }
        return true; // continue
    }
);
Console.WriteLine(result); // shows stats
Console.WriteLine($"Lines with ERROR: {errorCount}; Total lines: {result.TotalLines}");

Using Context + Finalization

class TaskContext
{
    public int LocalMatches;
}

var utf8 = Encoding.UTF8;
var pattern = utf8.GetBytes("WARN");
int globalMatches = 0;
object locker = new();

var res = await ParallelTextFileProcessor.ProcessAsync<TaskContext>(
    filePath: "big.log",
    contextFactory: async () => new TaskContext(), // could await async init
    processLine: (taskId, line, ctx) =>
    {
        if (line.IndexOf(pattern) >= 0)
            ctx!.LocalMatches++;
        return true;
    },
    finalizeTask: async ctx =>
    {
        lock (locker) globalMatches += ctx.LocalMatches;
        await Task.CompletedTask;
    },
    encoding: null,      // auto detect via BOM else UTF?8
    numTasks: -1,        // auto (Environment.ProcessorCount)
    chunkSize: -1        // auto (fileSize/numTasks bounded)
);

Console.WriteLine($"Total WARN lines: {globalMatches}");

Using context

var count = 1_984_587; // use some non odd, large enough number
var tempFile = Path.GetTempFileName();
var encoding = Encoding.GetEncoding("utf-16");
var testData = Enumerable.Range(0, count).ToArray();
using var cts = new CancellationTokenSource();
// write sequential integers, one per line
await File.WriteAllLinesAsync(tempFile, testData.Select(i => i.ToString()), encoding, cts.Token);

// global context list to hold all task contexts - must be synchronized!!
var contextList = new List<List<int>>();
// factory to create a new context for each task
List<int> factory()
{
    var list = new List<int>(count);
    lock (contextList) // MUST lock as this may be called concurrently
        contextList.Add(list);
    return list;
}
// process the file
var processResult = await ParallelTextFileProcessor.ProcessAsync<List<int>>(
    filePath: tempFile,
    processLine: (batchNumber, line, context) =>
    {
        var s = encoding.GetString(line); // this could be avoided if we processed bytes directly
        var value = int.Parse(s);
        context!.Add(value); // this doese not need locking as each context is task-local
        return true;
    },
    contextFactory: async () => await Task.FromResult(factory()),
    numTasks: -1,
    encoding: encoding,
    chunkSize: -1 // auto size
);
Console.WriteLine($"Processed {processResult.TotalLines} lines");
Console.WriteLine($"Process result: {processResult}");
// note: the items may be out of order
var numbers = contextList.SelectMany(l => l).Order().ToList();
// to more stuff with the numbers...

Early Stop

Return false in processLine when you have found enough matches.

Accessing Text (Decoding)

To convert a Span<byte> line to string only when needed:

var text = utf8.GetString(lineBytes);

Avoid decoding every line if you can operate on bytes directly.

Encoding Notes

  • If no encoding supplied: first 4 bytes are inspected for known BOMs; default = UTF?8 w/o BOM
  • Lines are split on \n (LF). A preceding \r (CR) for CRLF endings is trimmed
  • Chunk boundaries are moved backward to the previous LF so multi?byte chars are never split

Sizing & Performance

  • Default chunk size ? fileSize / numTasks (capped by MaxArraySize); always multiple of 4
  • Tune chunkSize manually for scenarios with extremely skewed line lengths
  • Larger chunks = fewer tasks, lower scheduling overhead
  • Too large chunks can reduce parallelism if lines are sparse near boundaries

Thread Safety

  • processLine runs concurrently on multiple tasks
  • finalizeTask may run concurrently for different contexts
  • Use synchronization for shared state (Interlocked, lock, concurrent collections)

Limitations / TODO

  • Only BOM?detectable encodings supported
  • No built?in backpressure if finalizeTask is slow
  • Does not presently expose cancellation token
  • No direct support for multi?line record formats
  • Could expose pluggable line delimiter logic / multi?char terminators

Error Handling

Exceptions inside a task propagate via Task.WhenAll and will fault the overall ProcessAsync call (unpooled buffers are still returned in finally).

API Helpers

  • DetectEncoding(Span<byte>)
  • GetBytesToSkipForEncoding(Encoding, Span<byte>)
  • RecommendedTasks(int)
  • RecommendedChunkSize(long, int, int)
  • PreviousMultipleOf4(int)
  • GetFileLength(string)

Return Semantics

TotalTasks can exceed ParallelTasks because the file may require more chunks than the number of concurrently active tasks.

Benchmarking (Suggested)

Use BenchmarkDotNet or .NET Trace/PerfView to measure real workloads (not included yet).

Contributing

Issues + PRs welcome: tests, cancellation support, custom delimiter strategies, streaming decode helpers.

License

MIT (add SPDX header / file if not already present).

Disclaimer

Target framework set to .NET 10 (preview at time of writing). Adjust if your environment lacks latest SDK.

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
  • net9.0

    • No dependencies.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.1.0 159 9/20/2025

0.1.0 Initial preview.