ある時点で完了待ちになっているタスクの数を制限して、非同期タスクの束を実行したいと思います。
たとえば、1000個のURLがあり、一度に50個の要求だけを開きたいとします。ただし、1つの要求が完了するとすぐに、リスト内の次のURLへの接続が開きます。そのようにして、URLリストが使い果たされるまで、常に正確に50接続が開かれています。
可能であれば、私は与えられた数のスレッドを利用したいと思います。
拡張方法を思いついた、ThrottleTasksAsync
それは私が欲しいものです。もっと簡単な解決策はもうありますか?これは一般的なシナリオだと思います。
使用法:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
これがコードです:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
方法は利用するBlockingCollection
そしてSemaphoreSlim
それを機能させるために。スロットルは一方のスレッドで実行され、すべての非同期タスクは他方のスレッドで実行されます。並列処理を実現するために、maxDegreeOfParallelismパラメータを追加しました。Parallel.ForEach
ループは、while
ループ。
古いバージョンは次のとおりです。
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
しかし、スレッドプールは早く使い尽くされるので、できません。async
/await
。
ボーナス:で問題を回避するにはBlockingCollection
例外がスローされる場所Take()
いつCompleteAdding()
と呼ばれる、私は使っているTryTake
タイムアウトでオーバーロードします。でタイムアウトを使用しなかった場合TryTake
、それを使用する目的を無効にするだろうBlockingCollection
以来TryTake
ブロックしません。もっと良い方法はありますか?理想的には、TakeAsync
方法。
推奨されているように、TPLデータフローを使用してください。
ATransformBlock<TInput, TOutput>
あなたが探しているものかもしれません。
あなたが定義するMaxDegreeOfParallelism
同時に変換できる文字列の数(つまりダウンロードできるURLの数)を制限するため。その後、ブロックにURLを投稿し、完了したらブロックにアイテムの追加が完了したことを伝え、応答を取得します。
var downloader = new TransformBlock<string, HttpResponse>(
url => Download(url),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
);
var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);
foreach(var url in urls)
downloader.Post(url);
//or await downloader.SendAsync(url);
downloader.Complete();
await downloader.Completion;
IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
//process responses
}
注:TransformBlock
入力と出力の両方をバッファします。では、それでは、なぜそれをBufferBlock
?
なぜならTransformBlock
すべてのアイテムが完成するまで完了しません(HttpResponse
)消費されたawait downloader.Completion
ハングします。代わりに、downloader
すべての出力を専用のバッファブロックに転送します。downloader
完了し、バッファブロックを検査します。
MaxDegreeOfParallelism
- dcastroSendAsync
)チーズはブロックで緩衝されます。ブロックはバッファからURLを取得し続け、一度に50を超えることはありません。結果は別のバッファに入れられます。 ATransformBlock
入力と出力の両方をバッファします。 - dcastroMaxDegreeOfParallelism
まったく同じように働いたParallel.ForEach
並列処理を実現するために、任意の数のスレッドを作成するだけです。私は間違っていた、そしてパラメータは非常にうまく機能しますasync
。 Tpl.Dataflowは美しく機能します。ありがとうございます。 - Josh Wyant
あなたが1000個のURLを持っていて、あなたが開いているのは50個のリクエストだけだとしましょう。 時間; 1つのリクエストが完了するとすぐに、あなたは接続を開く リスト内の次のURLに移動します。そのように、常に正確に50があります URLリストが使い果たされるまで、接続は一度に開きます。
次のような簡単な解決策がここに何度も出てきました。ブロッキングコードを使用せず、スレッドを明示的に作成しません。そのため、拡張性は非常に優れています。
const int MAX_DOWNLOADS = 50;
static async Task DownloadAsync(string[] urls)
{
using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
using (var httpClient = new HttpClient())
{
var tasks = urls.Select(async url =>
{
await semaphore.WaitAsync();
try
{
var data = await httpClient.GetStringAsync(url);
Console.WriteLine(data);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
}
事は、処理ダウンロードしたデータの違うパイプライン違う特にCPUバウンド処理の場合は、並列処理のレベル。
たとえば、4つのスレッドで同時にデータ処理を行い(CPUコアの数)、さらに多くのデータを処理するために最大50の保留中の要求(スレッドをまったく使用しない)を使用することをお勧めします。残念ながら、これはあなたのコードが現在行っていることではありません。
それがTPL DataflowまたはRxが好ましい解決策として役立つかもしれないところです。それでも、普通のTPLでこのようなものを実装することは確かに可能です。ここでの唯一のブロックコードは、実際のデータ処理を内部で行うコードです。Task.Run
:
const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;
// process data
class Processing
{
SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
HashSet<Task> _pending = new HashSet<Task>();
object _lock = new Object();
async Task ProcessAsync(string data)
{
await _semaphore.WaitAsync();
try
{
await Task.Run(() =>
{
// simuate work
Thread.Sleep(1000);
Console.WriteLine(data);
});
}
finally
{
_semaphore.Release();
}
}
public async void QueueItemAsync(string data)
{
var task = ProcessAsync(data);
lock (_lock)
_pending.Add(task);
try
{
await task;
}
catch
{
if (!task.IsCanceled && !task.IsFaulted)
throw; // not the task's exception, rethrow
// don't remove faulted/cancelled tasks from the list
return;
}
// remove successfully completed tasks from the list
lock (_lock)
_pending.Remove(task);
}
public async Task WaitForCompleteAsync()
{
Task[] tasks;
lock (_lock)
tasks = _pending.ToArray();
await Task.WhenAll(tasks);
}
}
// download data
static async Task DownloadAsync(string[] urls)
{
var processing = new Processing();
using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
using (var httpClient = new HttpClient())
{
var tasks = urls.Select(async (url) =>
{
await semaphore.WaitAsync();
try
{
var data = await httpClient.GetStringAsync(url);
// put the result on the processing pipeline
processing.QueueItemAsync(data);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks.ToArray());
await processing.WaitForCompleteAsync();
}
}
BlockingCollection
。使用できることがわかりませんでしたWaitAsync
そのように。 @Noseratioありがとうございます。 - Josh Wyantasync
私のもう1つの目標は、複数のコアで作業することです。この答えは私の最初の目標に向けたもので、Dataflowはそれらの両方に向けたものです。 @Noseratio - Josh Wyant
要求されたように、これは私が一緒に行くことになったコードです。
作業はマスター/ディテール構成で設定され、各マスターはバッチとして処理されます。各作業単位は、次のようにキューに入れられます。
var success = true;
// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
await masterBuffer.SendAsync(master);
}
// Finished sending master records
masterBuffer.Complete();
// Now, wait for all the batches to complete.
await batchAction.Completion;
return success;
他の外部プロセスの作業を節約するために、マスターは一度に1つずつバッファーに入れられます。各マスターの詳細は、次の方法で仕事に派遣されます。masterTransform
TransformManyBlock
。 ABatchedJoinBlock
1つのバッチで詳細を収集するためにも作成されます。
実際の作業はdetailTransform
TransformBlock
非同期に、一度に150BoundedCapacity
チェーンの開始時にあまりにも多くのマスターがバッファーに入れられないようにするために300に設定され、同時に150レコードを一度に処理できるように十分な詳細レコードをキューに入れる余地があります。ブロックはobject
それがターゲットかどうかに応じてリンクを越えてフィルタリングされるからです。Detail
またはException
。
のbatchAction
ActionBlock
すべてのバッチから出力を収集し、バッチごとに一括データベース更新、エラーロギングなどを実行します。
いくつかありますBatchedJoinBlock
s、マスターごとに1つそれぞれ以来ISourceBlock
は順次出力され、各バッチは1つのマスターに関連付けられた詳細レコードの数だけを受け入れます。バッチは順番に処理されます。各ブロックは1つのグループのみを出力し、完了時にリンク解除されます。最後のバッチブロックだけが完了を最後のブロックに伝えます。ActionBlock
。
データフローネットワーク
// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;
// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });
// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
var records = await StoredProcedures.GetObjectsAsync(masterRecord);
// Filter the master records based on some criteria here
var filteredRecords = records;
// Only propagate completion to the last batch
var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;
// Create a batch join block to encapsulate the results of the master record.
var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
// Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });
// Unlink batchjoinblock upon completion.
// (the returned task does not need to be awaited, despite the warning.)
batchjoinblock.Completion.ContinueWith(task =>
{
detailLink1.Dispose();
detailLink2.Dispose();
batchLink.Dispose();
});
return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
try
{
// Perform the action for each detail here asynchronously
await DoSomethingAsync();
return detail;
}
catch (Exception e)
{
success = false;
return e;
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });
// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
var details = batch.Item1.Cast<Detail>();
var errors = batch.Item2.Cast<Exception>();
// Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
await
。のParallel.ForEach
2つか4つの効果を同時に達成するwhile
ループします。 - Josh Wyant