46

ある時点で完了待ちになっているタスクの数を制限して、非同期タスクの束を実行したいと思います。

たとえば、1000個のURLがあり、一度に50個の要求だけを開きたいとします。ただし、1つの要求が完了するとすぐに、リスト内の次のURLへの接続が開きます。そのようにして、URLリストが使い果たされるまで、常に正確に50接続が開かれています。

可能であれば、私は与えられた数のスレッドを利用したいと思います。

拡張方法を思いついた、ThrottleTasksAsyncそれは私が欲しいものです。もっと簡単な解決策はもうありますか?これは一般的なシナリオだと思います。

使用法:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

これがコードです:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

方法は利用するBlockingCollectionそしてSemaphoreSlimそれを機能させるために。スロットルは一方のスレッドで実行され、すべての非同期タスクは他方のスレッドで実行されます。並列処理を実現するために、maxDegreeOfParallelismパラメータを追加しました。Parallel.ForEachループは、whileループ。

古いバージョンは次のとおりです。

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

しかし、スレッドプールは早く使い尽くされるので、できません。async/await

ボーナス:で問題を回避するにはBlockingCollection例外がスローされる場所Take()いつCompleteAdding()と呼ばれる、私は使っているTryTakeタイムアウトでオーバーロードします。でタイムアウトを使用しなかった場合TryTake、それを使用する目的を無効にするだろうBlockingCollection以来TryTakeブロックしません。もっと良い方法はありますか?理想的には、TakeAsync方法。


  • もっと良い方法はありますか?はい、TPLデータフロー。 - Scott Chamberlain
  • urlの例では、すべてのURLをConcurrentBagに入れて50スレッドを開始し、各スレッドでURLを取得してバッグが空になるまでリクエストを実行できます。 - Bogdan
  • 一般的なケースでは、ConcurrentBag of delegatesを使用してください:) - Bogdan
  • @Bogdan私は何千ものリクエストを行っているので、それらをすべて同じスレッドで実行したいと思います。await。のParallel.ForEach2つか4つの効果を同時に達成するwhileループします。 - Josh Wyant
  • @Scott Chamberlain TPLデータフローをどのように使用すれば状況が改善されますか? - Josh Wyant

3 답변


50

推奨されているように、TPLデータフローを使用してください。

ATransformBlock<TInput, TOutput>あなたが探しているものかもしれません。

あなたが定義するMaxDegreeOfParallelism同時に変換できる文字列の数(つまりダウンロードできるURLの数)を制限するため。その後、ブロックにURLを投稿し、完了したらブロックにアイテムの追加が完了したことを伝え、応答を取得します。

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

注:TransformBlock入力と出力の両方をバッファします。では、それでは、なぜそれをBufferBlock

なぜならTransformBlockすべてのアイテムが完成するまで完了しません(HttpResponse)消費されたawait downloader.Completionハングします。代わりに、downloaderすべての出力を専用のバッファブロックに転送します。downloader完了し、バッファブロックを検査します。


  • +1なんて優雅な解決策でしょう。コードが少なくなり、機能も増えました。 - BlueM
  • うまくいけば、私のコードは少なくともその点を説明した:)私の場合、並列処理はおそらく低いでしょうが、多くの非同期タスクがポストされるでしょう。このモデルを使用して非同期タスクを調整する方法 - Josh Wyant
  • @JoshWyant一度にダウンロードできるURLの数を制限するという意味ですか?を使うMaxDegreeOfParallelism - dcastro
  • @JoshWyant上記のコードでは、必要なだけURLを投稿します(SendAsync)チーズはブロックで緩衝されます。ブロックはバッファからURLを取得し続け、一度に50を超えることはありません。結果は別のバッファに入れられます。 ATransformBlock入力と出力の両方をバッファします。 - dcastro
  • @dcastroそこで、最終的にはDataflowソリューションを使用しました。私の最初の恐怖はそれでしたMaxDegreeOfParallelismまったく同じように働いたParallel.ForEach並列処理を実現するために、任意の数のスレッドを作成するだけです。私は間違っていた、そしてパラメータは非常にうまく機能しますasync。 Tpl.Dataflowは美しく機能します。ありがとうございます。 - Josh Wyant

38

あなたが1000個のURLを持っていて、あなたが開いているのは50個のリクエストだけだとしましょう。   時間; 1つのリクエストが完了するとすぐに、あなたは接続を開く   リスト内の次のURLに移動します。そのように、常に正確に50があります   URLリストが使い果たされるまで、接続は一度に開きます。

次のような簡単な解決策がここに何度も出てきました。ブロッキングコードを使用せず、スレッドを明示的に作成しません。そのため、拡張性は非常に優れています。

const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}

事は、処理ダウンロードしたデータの違うパイプライン違う特にCPUバウンド処理の場合は、並列処理のレベル。

たとえば、4つのスレッドで同時にデータ処理を行い(CPUコアの数)、さらに多くのデータを処理するために最大50の保留中の要求(スレッドをまったく使用しない)を使用することをお勧めします。残念ながら、これはあなたのコードが現在行っていることではありません。

それがTPL DataflowまたはRxが好ましい解決策として役立つかもしれないところです。それでも、普通のTPLでこのようなものを実装することは確かに可能です。ここでの唯一のブロックコードは、実際のデータ処理を内部で行うコードです。Task.Run

const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}


  • これが最も単純で最も簡単な答えです。それは、私がやろうとしていたこととほとんど同じです。私の間違いは、セマフォを別のスレッドで実行しようとしたことですが、これにより、セマフォが非常に簡単になり、BlockingCollection。使用できることがわかりませんでしたWaitAsyncそのように。 @Noseratioありがとうございます。 - Josh Wyant
  • @ JoshWyant、問題ありません。そのパイプラインが適切に設計され組み立てられていれば、これはTPLデータフローが舞台裏でやることとほぼ同じだと私は思います。 TPLデータフローのスキルが不足しているだけなのですが、もっと時間をかけていきます。 - noseratio
  • あなたは正しいです。それを理解すれば、TPL Dataflowは美しく働きます。それは配布する問題を処理しますasync私のもう1つの目標は、複数のコアで作業することです。この答えは私の最初の目標に向けたもので、Dataflowはそれらの両方に向けたものです。 @Noseratio - Josh Wyant
  • 同じエンドポイントに到達したときに、.netコアのデフォルトの新しいHttpClient()でこれをテストする場合は注意してください。 new HttpClient(new HttpClientHandler {MaxConnectionsPerServer = ...})を指定しない限り、デフォルトでは1サーバーあたりの接続数が制限されます(これは、2つに制限されていたfiddlerの場合)。この回答に含まれるものはすべて宣伝されているとおりに機能しますが、それでもその設定によって制限されることがあります。 - Tom

3

要求されたように、これは私が一緒に行くことになったコードです。

作業はマスター/ディテール構成で設定され、各マスターはバッチとして処理されます。各作業単位は、次のようにキューに入れられます。

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

他の外部プロセスの作業を節約するために、マスターは一度に1つずつバッファーに入れられます。各マスターの詳細は、次の方法で仕事に派遣されます。masterTransform TransformManyBlock。 ABatchedJoinBlock1つのバッチで詳細を収集するためにも作成されます。

実際の作業はdetailTransform TransformBlock非同期に、一度に150BoundedCapacityチェーンの開始時にあまりにも多くのマスターがバッファーに入れられないようにするために300に設定され、同時に150レコードを一度に処理できるように十分な詳細レコードをキューに入れる余地があります。ブロックはobjectそれがターゲットかどうかに応じてリンクを越えてフィルタリングされるからです。DetailまたはException

batchAction ActionBlockすべてのバッチから出力を収集し、バッチごとに一括データベース更新、エラーロギングなどを実行します。

いくつかありますBatchedJoinBlocks、マスターごとに1つそれぞれ以来ISourceBlockは順次出力され、各バッチは1つのマスターに関連付けられた詳細レコードの数だけを受け入れます。バッチは順番に処理されます。各ブロックは1つのグループのみを出力し、完了時にリンク解除されます。最後のバッチブロックだけが完了を最後のブロックに伝えます。ActionBlock

データフローネットワーク

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });

リンクされた質問


関連する質問

最近の質問