ある時点で完了待ちになっているタスクの数を制限して、非同期タスクの束を実行したいと思います。
たとえば、1000個のURLがあり、一度に50個の要求だけを開きたいとします。ただし、1つの要求が完了するとすぐに、リスト内の次のURLへの接続が開きます。そのようにして、URLリストが使い果たされるまで、常に正確に50接続が開かれています。
可能であれば、私は与えられた数のスレッドを利用したいと思います。
拡張方法を思いついた、ThrottleTasksAsync
それは私が欲しいものです。もっと簡単な解決策はもうありますか?これは一般的なシナリオだと思います。
使用法:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
これがコードです:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
方法は利用するBlockingCollection
そしてSemaphoreSlim
それを機能させるために。スロットルは一方のスレッドで実行され、すべての非同期タスクは他方のスレッドで実行されます。並列処理を実現するために、maxDegreeOfParallelismパラメータを追加しました。Parallel.ForEach
ループは、while
ループ。
古いバージョンは次のとおりです。
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
しかし、スレッドプールは早く使い尽くされるので、できません。async
/await
。
ボーナス:で問題を回避するにはBlockingCollection
例外がスローされる場所Take()
いつCompleteAdding()
と呼ばれる、私は使っているTryTake
タイムアウトでオーバーロードします。でタイムアウトを使用しなかった場合TryTake
、それを使用する目的を無効にするだろうBlockingCollection
以来TryTake
ブロックしません。もっと良い方法はありますか?理想的には、TakeAsync
方法。
await
。のParallel.ForEach
2つか4つの効果を同時に達成するwhile
ループします。 - Josh Wyant