주어진 시간에 완료 될 보류중인 작업의 수에 제한이있는 비동기 작업을 여러 개 실행하고 싶습니다.
1000 개의 URL이 있으며 한번에 50 개의 요청 만 열어 보려고한다고 가정 해보십시오. 그러나 하나의 요청이 완료되자 마자 목록의 다음 URL에 대한 연결이 열립니다. 이렇게하면 URL 목록을 모두 사용할 때까지 한 번에 정확히 50 개의 연결 만 열립니다.
가능하다면 주어진 수의 스레드를 활용하고 싶습니다.
나는 확장 방법을 고안했다.ThrottleTasksAsync
그게 내가 원하는 걸 해. 거기에 이미 간단한 해결책이 있습니까? 나는 이것이 일반적인 시나리오라고 생각할 것이다.
용법:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
다음은 코드입니다.
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
이 방법은BlockingCollection
과SemaphoreSlim
그것을 작동하게합니다. throttler는 하나의 스레드에서 실행되며 모든 비동기 작업은 다른 스레드에서 실행됩니다. 병렬성을 달성하기 위해 maxDegreeOfParallelism 매개 변수를 추가하여Parallel.ForEach
루프는 다시while
고리.
이전 버전은 다음과 같습니다.
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
그러나 스레드 풀이 빨리 소모되고 수행 할 수 없습니다.async
/await
.
보너스:에서 문제를 해결하려면BlockingCollection
예외가 던져지는 곳Take()
언제CompleteAdding()
내가 전화를 걸면TryTake
타임 아웃으로 오버로드. 시간 초과를 사용하지 않은 경우TryTake
, 그것의 목적을 패배시킬 것이다.BlockingCollection
이후TryTake
차단하지 않습니다. 더 좋은 방법이 있습니까? 이상적으로는TakeAsync
방법.
제안 된대로 TPL 데이터 흐름을 사용하십시오.
에이TransformBlock<TInput, TOutput>
네가 찾고있는 것일 수도있어.
당신은MaxDegreeOfParallelism
병렬로 얼마나 많은 문자열을 변환 할 수 있는지 (즉, 얼마나 많은 URL을 다운로드 할 수 있는지) 제한합니다. 그런 다음 블록에 URL을 게시하고 작업이 끝나면 블록 추가에 항목을 추가하고 응답을 가져옵니다.
var downloader = new TransformBlock<string, HttpResponse>(
url => Download(url),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
);
var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);
foreach(var url in urls)
downloader.Post(url);
//or await downloader.SendAsync(url);
downloader.Complete();
await downloader.Completion;
IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
//process responses
}
주 :TransformBlock
입력과 출력을 모두 버퍼링합니다. 그렇다면 왜 우리는 그것을BufferBlock
?
왜냐하면TransformBlock
모든 항목이 완료 될 때까지 완료되지 않습니다.HttpResponse
)가 소비되었고,await downloader.Completion
멈출거야. 대신, 우리는downloader
모든 출력을 전용 버퍼 블록으로 전달합니다. 그런 다음downloader
버퍼 블록을 완료하고 검사하십시오.
MaxDegreeOfParallelism
- dcastroSendAsync
). Theese는 블록에 의해 버퍼링됩니다. 블록은 버퍼에서 URL을 가져 와서 한 번에 50 개를 처리하지 않습니다. 그 결과는 다른 버퍼에 저장됩니다. 에이TransformBlock
입력과 출력을 모두 버퍼링합니다. - dcastroMaxDegreeOfParallelism
정확히 같이 일했다.Parallel.ForEach
병렬 처리를 위해 임의의 양의 스레드를 생성하는 것입니다. 나는 틀렸어. 매개 변수는 매우 잘 작동한다.async
. Tpl.Dataflow는 아름답게 작동합니다. 감사! - Josh Wyant
1000 개의 URL이 있고 50 개의 요청 만 열려고한다고 가정 해 보겠습니다. 시간; 하지만 하나의 요청이 완료되자 마자 연결을 열어 목록의 다음 URL로 이동하십시오. 그렇게하면 항상 정확히 50 연결은 URL 목록이 모두 소모 될 때까지 한 번에 열립니다.
다음 간단한 솔루션이 여기에 여러 번 등장했습니다. 블로킹 코드를 사용하지 않고 명시 적으로 스레드를 생성하지 않으므로 확장 성이 뛰어납니다.
const int MAX_DOWNLOADS = 50;
static async Task DownloadAsync(string[] urls)
{
using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
using (var httpClient = new HttpClient())
{
var tasks = urls.Select(async url =>
{
await semaphore.WaitAsync();
try
{
var data = await httpClient.GetStringAsync(url);
Console.WriteLine(data);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
}
문제는가공다운로드 한 데이터의다른파이프 라인,다른병렬 처리 수준, 특히 CPU 바인딩 처리 인 경우.
예를 들어 데이터 처리 (CPU 코어 수)를 동시에 수행하는 4 개의 스레드와 더 많은 데이터 (스레드를 전혀 사용하지 않는)에 대한 최대 50 개의 보류중인 요청을 원할 수 있습니다. AFAICT, 이것은 현재 귀하의 코드가하는 것이 아닙니다.
TPL Dataflow 또는 Rx가 선호되는 솔루션으로 유용 할 수 있습니다. 그러나 일반 TPL로 이와 같은 것을 구현하는 것이 가능합니다. 여기서 유일한 차단 코드는 내부에서 실제 데이터 처리를 수행하는 코드입니다.Task.Run
:
const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;
// process data
class Processing
{
SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
HashSet<Task> _pending = new HashSet<Task>();
object _lock = new Object();
async Task ProcessAsync(string data)
{
await _semaphore.WaitAsync();
try
{
await Task.Run(() =>
{
// simuate work
Thread.Sleep(1000);
Console.WriteLine(data);
});
}
finally
{
_semaphore.Release();
}
}
public async void QueueItemAsync(string data)
{
var task = ProcessAsync(data);
lock (_lock)
_pending.Add(task);
try
{
await task;
}
catch
{
if (!task.IsCanceled && !task.IsFaulted)
throw; // not the task's exception, rethrow
// don't remove faulted/cancelled tasks from the list
return;
}
// remove successfully completed tasks from the list
lock (_lock)
_pending.Remove(task);
}
public async Task WaitForCompleteAsync()
{
Task[] tasks;
lock (_lock)
tasks = _pending.ToArray();
await Task.WhenAll(tasks);
}
}
// download data
static async Task DownloadAsync(string[] urls)
{
var processing = new Processing();
using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
using (var httpClient = new HttpClient())
{
var tasks = urls.Select(async (url) =>
{
await semaphore.WaitAsync();
try
{
var data = await httpClient.GetStringAsync(url);
// put the result on the processing pipeline
processing.QueueItemAsync(data);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks.ToArray());
await processing.WaitForCompleteAsync();
}
}
BlockingCollection
. 나는 내가 사용할 수 있다는 것을 깨닫지 못했다.WaitAsync
그런 식으로. 고마워 .Noseratio. - Josh Wyantasync
내 다른 목표였던 여러 코어로 작업하십시오. 이 답변은 저의 첫 번째 목표를 다루며, Dataflow는이 두 가지 문제를 해결합니다. @Noseratio - Josh Wyant
요청에 따라, 여기에 내가 끝내는 코드가 있습니다.
작업은 마스터 - 세부 구성으로 설정되며 각 마스터는 일괄 처리로 처리됩니다. 각 작업 단위 (UOW)는 다음과 같이 대기열에 있습니다.
var success = true;
// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
await masterBuffer.SendAsync(master);
}
// Finished sending master records
masterBuffer.Complete();
// Now, wait for all the batches to complete.
await batchAction.Completion;
return success;
마스터는 다른 외부 프로세스에 대한 작업을 저장하기 위해 한 번에 하나씩 버퍼링됩니다. 각 마스터의 세부 정보는masterTransform
TransformManyBlock
. 에이BatchedJoinBlock
하나의 배치로 세부 정보를 수집하기 위해 생성됩니다.
실제 작업은detailTransform
TransformBlock
, 비동기 적으로, 한 번에 150.BoundedCapacity
너무 많은 마스터가 체인의 시작 부분에서 버퍼되지 않도록하고 300 개의 레코드를 한 번에 처리 할 수 있도록 대기열에 보관할 충분한 세부 레코드 공간을 남겨 두도록 300으로 설정됩니다. 블록은object
링크가 타겟인지 여부에 따라 링크를 통해 필터링되므로Detail
또는Exception
.
그만큼batchAction
ActionBlock
모든 배치에서 출력을 수집하고 각 배치에 대해 대량 데이터베이스 업데이트, 오류 로깅 등을 수행합니다.
몇 가지가있을 것입니다.BatchedJoinBlock
s, 각 마스터마다 하나씩. 이후 각ISourceBlock
순차적으로 출력되며 각 일괄 처리는 하나의 마스터와 관련된 세부 레코드 수만 허용하므로 일괄 처리가 순서대로 처리됩니다. 각 블록은 하나의 그룹 만 출력하고 완료되면 연결 해제됩니다. 마지막 배치 블록 만 완료를 최종 위치로 전달합니다.ActionBlock
.
데이터 흐름 네트워크 :
// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;
// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });
// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
var records = await StoredProcedures.GetObjectsAsync(masterRecord);
// Filter the master records based on some criteria here
var filteredRecords = records;
// Only propagate completion to the last batch
var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;
// Create a batch join block to encapsulate the results of the master record.
var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
// Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });
// Unlink batchjoinblock upon completion.
// (the returned task does not need to be awaited, despite the warning.)
batchjoinblock.Completion.ContinueWith(task =>
{
detailLink1.Dispose();
detailLink2.Dispose();
batchLink.Dispose();
});
return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
try
{
// Perform the action for each detail here asynchronously
await DoSomethingAsync();
return detail;
}
catch (Exception e)
{
success = false;
return e;
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });
// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
var details = batch.Item1.Cast<Detail>();
var errors = batch.Item2.Cast<Exception>();
// Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
await
. 그만큼Parallel.ForEach
2 ~ 4 개의 동시 발생 효과를 얻는다.while
루프. - Josh Wyant