46

주어진 시간에 완료 될 보류중인 작업의 수에 제한이있는 비동기 작업을 여러 개 실행하고 싶습니다.

1000 개의 URL이 있으며 한번에 50 개의 요청 만 열어 보려고한다고 가정 해보십시오. 그러나 하나의 요청이 완료되자 마자 목록의 다음 URL에 대한 연결이 열립니다. 이렇게하면 URL 목록을 모두 사용할 때까지 한 번에 정확히 50 개의 연결 만 열립니다.

가능하다면 주어진 수의 스레드를 활용하고 싶습니다.

나는 확장 방법을 고안했다.ThrottleTasksAsync그게 내가 원하는 걸 해. 거기에 이미 간단한 해결책이 있습니까? 나는 이것이 일반적인 시나리오라고 생각할 것이다.

용법:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

다음은 코드입니다.

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

이 방법은BlockingCollectionSemaphoreSlim그것을 작동하게합니다. throttler는 하나의 스레드에서 실행되며 모든 비동기 작업은 다른 스레드에서 실행됩니다. 병렬성을 달성하기 위해 maxDegreeOfParallelism 매개 변수를 추가하여Parallel.ForEach루프는 다시while고리.

이전 버전은 다음과 같습니다.

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

그러나 스레드 풀이 빨리 소모되고 수행 할 수 없습니다.async/await.

보너스:에서 문제를 해결하려면BlockingCollection예외가 던져지는 곳Take()언제CompleteAdding()내가 전화를 걸면TryTake타임 아웃으로 오버로드. 시간 초과를 사용하지 않은 경우TryTake, 그것의 목적을 패배시킬 것이다.BlockingCollection이후TryTake차단하지 않습니다. 더 좋은 방법이 있습니까? 이상적으로는TakeAsync방법.


  • 더 좋은 방법이 있습니까? 예,TPL 데이터 흐름. - Scott Chamberlain
  • url 예제에서 모든 URL을 ConcurrentBag에 넣고 50 개의 스레드를 시작할 수 있으며 각 스레드에서 url을 가져 와서 bag이 비게 될 때까지 요청을 수행 할 수 있습니다. - Bogdan
  • 일반적인 경우에는 대리자의 ConcurrentBag를 사용하십시오. :) - Bogdan
  • @Bogdan 나는 수천 개의 요청을하고 있지만, 나는 같은 스레드에서 모두 그들을 사용하고 싶다.await. 그만큼Parallel.ForEach2 ~ 4 개의 동시 발생 효과를 얻는다.while루프. - Josh Wyant
  • @Scott Chamberlain TPL Dataflow를 어떻게 사용하면 상황이 개선 될까요? - Josh Wyant

연결된 질문


관련된 질문

최근 질문