주어진 시간에 완료 될 보류중인 작업의 수에 제한이있는 비동기 작업을 여러 개 실행하고 싶습니다.
1000 개의 URL이 있으며 한번에 50 개의 요청 만 열어 보려고한다고 가정 해보십시오. 그러나 하나의 요청이 완료되자 마자 목록의 다음 URL에 대한 연결이 열립니다. 이렇게하면 URL 목록을 모두 사용할 때까지 한 번에 정확히 50 개의 연결 만 열립니다.
가능하다면 주어진 수의 스레드를 활용하고 싶습니다.
나는 확장 방법을 고안했다.ThrottleTasksAsync
그게 내가 원하는 걸 해. 거기에 이미 간단한 해결책이 있습니까? 나는 이것이 일반적인 시나리오라고 생각할 것이다.
용법:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
다음은 코드입니다.
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
이 방법은BlockingCollection
과SemaphoreSlim
그것을 작동하게합니다. throttler는 하나의 스레드에서 실행되며 모든 비동기 작업은 다른 스레드에서 실행됩니다. 병렬성을 달성하기 위해 maxDegreeOfParallelism 매개 변수를 추가하여Parallel.ForEach
루프는 다시while
고리.
이전 버전은 다음과 같습니다.
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
그러나 스레드 풀이 빨리 소모되고 수행 할 수 없습니다.async
/await
.
보너스:에서 문제를 해결하려면BlockingCollection
예외가 던져지는 곳Take()
언제CompleteAdding()
내가 전화를 걸면TryTake
타임 아웃으로 오버로드. 시간 초과를 사용하지 않은 경우TryTake
, 그것의 목적을 패배시킬 것이다.BlockingCollection
이후TryTake
차단하지 않습니다. 더 좋은 방법이 있습니까? 이상적으로는TakeAsync
방법.
await
. 그만큼Parallel.ForEach
2 ~ 4 개의 동시 발생 효과를 얻는다.while
루프. - Josh Wyant