EagerAsyncEnumerable 1.0.1
dotnet add package EagerAsyncEnumerable --version 1.0.1
NuGet\Install-Package EagerAsyncEnumerable -Version 1.0.1
<PackageReference Include="EagerAsyncEnumerable" Version="1.0.1" />
paket add EagerAsyncEnumerable --version 1.0.1
#r "nuget: EagerAsyncEnumerable, 1.0.1"
// Install EagerAsyncEnumerable as a Cake Addin #addin nuget:?package=EagerAsyncEnumerable&version=1.0.1 // Install EagerAsyncEnumerable as a Cake Tool #tool nuget:?package=EagerAsyncEnumerable&version=1.0.1
EagerAsyncEnumerable
Provides an implementation of IAsyncEnumerable<T> that eagerly enumerates over its supplied source, internally queueing all results, before an item is even requested from it.
Why would you want this?
Say you've got an IAsyncEnumerable
, and each iteration over it takes a long time to complete.
Meanwhile, your consumer of these items also operates asynchronously, and it also takes a long time
to complete.
The idea is, while your consumer is busy processing a result returned from the source
IAsyncEnumerable
, why not go ahead and produce the next item from your source IAsyncEnumerable
at the same time? You can have items queued and immediately ready the moment the consumer is
available to process another item. EagerAsyncEnumerable makes it easy to do this without
needing to manually stickhandle the tasks and the queue.
One real-world example could be making a series of API calls, and with each, saving the data to a database. The API calls could be slow, the database processing could also be slow. So, while doing the database processing, why not start up that next API call at the same time?
Example
Say you have an IAsyncEnumerable
like this:
private async IAsyncEnumerable<int> ProduceInts()
{
for (int i = 0; i < 10; i++)
{
await Task.Delay(100);
yield return i;
}
}
And your consumer is like this:
private async Task ConsumeInt(int i)
{
await Task.Delay(50);
Console.Write($"Value {i} saved to database.");
}
So, it takes 100ms to produce an int, and 50ms to consume one of those ints.
Let's say you do your processing like this:
private async Task ProcessInts()
{
var myInts = ProduceInts();
await foreach (var item in myInts)
{
await ConsumeInt(item);
}
}
The problem with this is that all the awaiting is done one after the other. You'll produce an int, which takes 100ms, and then you'll process it, which takes 50 ms. Then you await the next, which takes 100ms, and you process that, which takes 50ms, etc etc.
The total time spent would be (approx) 1500ms. (1000ms for producing the ints, plus an extra 500ms for processing them).
Considering those two methods have nothing to do with one another, why not start producing another int while the consumer is working?
Now, you could achieve this by manually grabbing the enumerator and stickhandling the tasks
yourself, but wouldn't it be nice if you could just do the same await foreach
as above and just
have it work?
That's what EagerAsyncEnumerable provides.
private async Task ProcessInts()
{
var myInts = ProduceInts().AsEagerEnumerable();
await foreach (var item in myInts)
{
await ConsumeInt(item);
}
}
This time, the moment your enumeration starts (as part of the await foreach
),
EagerAsyncEnumerable will start enumerating over the entire source ProduceInts
, in a
background thread, queueing all the results before they are requested. That leaves this thread
free to consume items at the same time this loading occurs. Thus, each time you iterate over this
await foreach
, EagerAsyncEnumerable can immediately return, synchronously, whatever
it has queued, potentially saving a bunch of time since the consumer won't have to wait. Of course,
if nothing is queued, it would wait for the producer, no different than if you weren't using
EagerAsyncEnumerable.
In essence, the producer ProduceInts
runs in parallel with the consumer ConsumeInt
.
This operation will only take (approx) 1050ms total, as all the ConsumeInt
methods (except
the last) will run at the same time as the ProduceInts
method is providing another int.
How does this differ from Parallel.ForEach
?
When you use Parallel.ForEach, you are asking to process all the items in the source sequence at the same time, in-parallel.
EagerAsyncEnumerable, meanwhile, will still only process, at most, one item from the source sequence at a time. It merely does this enumeration in the background before the consumer requests it.
Catches
Catch #1 - Items in your IAsyncEnumerable need to be processed one-at-a-time, in order.
EagerAsyncEnumerable is intended to be used with a simple await foreach
loop, where you take
one item at a time. If you can operate all those tasks at the same time, in parallel, you will get
better performance out of
Parallel.ForEach,
or Tasks.WhenAll.
Catch #2 - Your producer does not depend on the consumer in order to run to completion.
Since EagerAsyncEnumerable will eagerly load the next item before the consumer requests it, it's important that its operation doesn't depend on the results of your consumer in order to run. For example, if the consumer saves a value in a database to indicate state, and the next call to the producer depends on that value, you cannot use EagerAsyncEnumerable, as the producer could race ahead before the consumer saves that value, breaking your state.
Catch #3 - Both your consumer and your producer run asynchronously.
If either your producer or your consumer effectively run synchronously, or take a miniscule amount
of time to finish, there's absolutely no benefit to using this. You are creating overhead for no
gain, as this would run no faster than just iterating over your base IAsyncEnumerable
.
Catch #4 - Cancelling likely means ignoring what the Producer did.
EagerAsyncEnumerable loads the entire enumerable before you request it, so obviously if you cancel part-way through consuming it, your source enumerable still might have been ran all the way to completion. At best, you wasted time loading all those items for absolutely nothing. At worst, this could leave your business data in an inconsistent state, if it wasn't supposed to perform that extra enumeration.
Like using Parallel.ForEach
or Tasks.WhenAll
, clearly if you use this, you expect cancelling to
not be a normal operation, but an exceptional one, and you know there won't be consequences of
doing so.
Is there really a benefit to using a queue?
You might see this implementation and wonder if it's overly complicated. Couldn't we achieve the same results by caching only a single move at a time, instead of all of them?
Take the above example. The producer, ProduceInts
, takes longer than the consumer, ConsumeInt
.
Because of this, every time you produce an item, the consumer will consume it, and sit waiting for
the next one. It's impossible that you'll ever have more than 1 item that would be queued at a
time. Thus, rather than having a queue at all, why not just cache the task to move to the next
item, await that when the consumer requests it, then move to the next one before yielding the
result? Same results, much less complexity, no need for a queue.
And even if the producer was the fast one, it'd be no different. Let's say ProduceInts
takes
50ms and ConsumeInts
takes 100ms. Sure, you could queue multiple items from ProduceInts
while ConsumeInts
is running, but ultimately ConsumeInts
is going to be the bottleneck. The
best it can do is consume one int every 100ms. Thus, it still takes 1050ms to run: 50 ms for the
first produce, then 1000ms to consume the 10 ints. Whether you produce & queue all the ints at the
very beginning of the program, or you only produce one while the consumer is running makes
absolutely no difference.
The reality is, if your producer and consumer are predictable in how long they take, there really is no benefit to queuing anything. Because one side, the producer or consumer, is always going to be the bottleneck, you can only move as fast as that.
Nonetheless my implementation still uses a queue. Why? For when it's unpredictable. Say consuming the first item takes a really long time for some reason, but all the other instances are comparatively quick. At that point, since the producer went ahead and queued a bunch, the consumer can just zip through all of them one after the other without having to wait on the producer.
Now, in normal operation, your producer and consumer probably won't be so unpredictable as to notice. But, even in benchmarking, there's not much overhead to using a queued approach. The CPU usage is dwarfed by your asynchronous operation, exactly as you'd expect. And it only requires a few extra allocations for the queue. So, quite simply, it doesn't detract to use it, but it could potentially really benefit, so why not use it?
And let's face it, the real world really isn't so predictable. API slowdowns, DB slowdowns, who knows what could happen.
How To Use
It's as easy as calling the AsEagerEnumerable
extension method on any of the following:
- IAsyncEnumerable<T>
IEnumerable<Task>
IEnumerable<Task<T>>
IEnumerable<ValueTask>
IEnumerable<ValueTask<T>>
Then, use await foreach
as you otherwise would.
Or, alternatively, you can use the GetEagerEnumerator
extension if you'd rather work directly
with the IAsyncEnumerator<T>.
Just note that using this will immediately start loading from the source, before you even make the
first MoveNextAsync
call. That could be a good thing, or a bad thing, depending.
You can optionally supply a CancellationToken, as you desire, but don't forget: you cannot predict how many items were actually iterated over your source enumerable the moment you cancel. So long as that won't cause you any state problems or issues with your business data, go right ahead and cancel as required.
Basically, use CancellationToken
for exceptions, like a user explicitly desiring to cancel, or
some sort of operational failure.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 is compatible. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Microsoft.Bcl.AsyncInterfaces (>= 7.0.0)
-
.NETStandard 2.1
- No dependencies.
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.