46

ある時点で完了待ちになっているタスクの数を制限して、非同期タスクの束を実行したいと思います。

たとえば、1000個のURLがあり、一度に50個の要求だけを開きたいとします。ただし、1つの要求が完了するとすぐに、リスト内の次のURLへの接続が開きます。そのようにして、URLリストが使い果たされるまで、常に正確に50接続が開かれています。

可能であれば、私は与えられた数のスレッドを利用したいと思います。

拡張方法を思いついた、ThrottleTasksAsyncそれは私が欲しいものです。もっと簡単な解決策はもうありますか?これは一般的なシナリオだと思います。

使用法:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

これがコードです:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

方法は利用するBlockingCollectionそしてSemaphoreSlimそれを機能させるために。スロットルは一方のスレッドで実行され、すべての非同期タスクは他方のスレッドで実行されます。並列処理を実現するために、maxDegreeOfParallelismパラメータを追加しました。Parallel.ForEachループは、whileループ。

古いバージョンは次のとおりです。

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

しかし、スレッドプールは早く使い尽くされるので、できません。async/await

ボーナス:で問題を回避するにはBlockingCollection例外がスローされる場所Take()いつCompleteAdding()と呼ばれる、私は使っているTryTakeタイムアウトでオーバーロードします。でタイムアウトを使用しなかった場合TryTake、それを使用する目的を無効にするだろうBlockingCollection以来TryTakeブロックしません。もっと良い方法はありますか?理想的には、TakeAsync方法。


  • もっと良い方法はありますか?はい、TPLデータフロー。 - Scott Chamberlain
  • urlの例では、すべてのURLをConcurrentBagに入れて50スレッドを開始し、各スレッドでURLを取得してバッグが空になるまでリクエストを実行できます。 - Bogdan
  • 一般的なケースでは、ConcurrentBag of delegatesを使用してください:) - Bogdan
  • @Bogdan私は何千ものリクエストを行っているので、それらをすべて同じスレッドで実行したいと思います。await。のParallel.ForEach2つか4つの効果を同時に達成するwhileループします。 - Josh Wyant
  • @Scott Chamberlain TPLデータフローをどのように使用すれば状況が改善されますか? - Josh Wyant

リンクされた質問


関連する質問

最近の質問