주어진 시간에 완료 될 보류중인 작업의 수에 제한이있는 비동기 작업을 여러 개 실행하고 싶습니다.
1000 개의 URL이 있으며 한번에 50 개의 요청 만 열어 보려고한다고 가정 해보십시오. 그러나 하나의 요청이 완료되자 마자 목록의 다음 URL에 대한 연결이 열립니다. 이렇게하면 URL 목록을 모두 사용할 때까지 한 번에 정확히 50 개의 연결 만 열립니다.
가능하다면 주어진 수의 스레드를 활용하고 싶습니다.
나는 확장 방법을 고안했다.ThrottleTasksAsync그게 내가 원하는 걸 해. 거기에 이미 간단한 해결책이 있습니까? 나는 이것이 일반적인 시나리오라고 생각할 것이다.
용법:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
다음은 코드입니다.
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
이 방법은BlockingCollection과SemaphoreSlim그것을 작동하게합니다. throttler는 하나의 스레드에서 실행되며 모든 비동기 작업은 다른 스레드에서 실행됩니다. 병렬성을 달성하기 위해 maxDegreeOfParallelism 매개 변수를 추가하여Parallel.ForEach루프는 다시while고리.
이전 버전은 다음과 같습니다.
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
그러나 스레드 풀이 빨리 소모되고 수행 할 수 없습니다.async/await.
보너스:에서 문제를 해결하려면BlockingCollection예외가 던져지는 곳Take()언제CompleteAdding()내가 전화를 걸면TryTake타임 아웃으로 오버로드. 시간 초과를 사용하지 않은 경우TryTake, 그것의 목적을 패배시킬 것이다.BlockingCollection이후TryTake차단하지 않습니다. 더 좋은 방법이 있습니까? 이상적으로는TakeAsync방법.
제안 된대로 TPL 데이터 흐름을 사용하십시오.
에이TransformBlock<TInput, TOutput>네가 찾고있는 것일 수도있어.
당신은MaxDegreeOfParallelism병렬로 얼마나 많은 문자열을 변환 할 수 있는지 (즉, 얼마나 많은 URL을 다운로드 할 수 있는지) 제한합니다. 그런 다음 블록에 URL을 게시하고 작업이 끝나면 블록 추가에 항목을 추가하고 응답을 가져옵니다.
var downloader = new TransformBlock<string, HttpResponse>(
url => Download(url),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
);
var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);
foreach(var url in urls)
downloader.Post(url);
//or await downloader.SendAsync(url);
downloader.Complete();
await downloader.Completion;
IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
//process responses
}
주 :TransformBlock입력과 출력을 모두 버퍼링합니다. 그렇다면 왜 우리는 그것을BufferBlock?
왜냐하면TransformBlock모든 항목이 완료 될 때까지 완료되지 않습니다.HttpResponse)가 소비되었고,await downloader.Completion멈출거야. 대신, 우리는downloader모든 출력을 전용 버퍼 블록으로 전달합니다. 그런 다음downloader버퍼 블록을 완료하고 검사하십시오.
MaxDegreeOfParallelism - dcastroSendAsync). Theese는 블록에 의해 버퍼링됩니다. 블록은 버퍼에서 URL을 가져 와서 한 번에 50 개를 처리하지 않습니다. 그 결과는 다른 버퍼에 저장됩니다. 에이TransformBlock입력과 출력을 모두 버퍼링합니다. - dcastroMaxDegreeOfParallelism정확히 같이 일했다.Parallel.ForEach병렬 처리를 위해 임의의 양의 스레드를 생성하는 것입니다. 나는 틀렸어. 매개 변수는 매우 잘 작동한다.async. Tpl.Dataflow는 아름답게 작동합니다. 감사! - Josh Wyant
1000 개의 URL이 있고 50 개의 요청 만 열려고한다고 가정 해 보겠습니다. 시간; 하지만 하나의 요청이 완료되자 마자 연결을 열어 목록의 다음 URL로 이동하십시오. 그렇게하면 항상 정확히 50 연결은 URL 목록이 모두 소모 될 때까지 한 번에 열립니다.
다음 간단한 솔루션이 여기에 여러 번 등장했습니다. 블로킹 코드를 사용하지 않고 명시 적으로 스레드를 생성하지 않으므로 확장 성이 뛰어납니다.
const int MAX_DOWNLOADS = 50;
static async Task DownloadAsync(string[] urls)
{
using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
using (var httpClient = new HttpClient())
{
var tasks = urls.Select(async url =>
{
await semaphore.WaitAsync();
try
{
var data = await httpClient.GetStringAsync(url);
Console.WriteLine(data);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks);
}
}
문제는가공다운로드 한 데이터의다른파이프 라인,다른병렬 처리 수준, 특히 CPU 바인딩 처리 인 경우.
예를 들어 데이터 처리 (CPU 코어 수)를 동시에 수행하는 4 개의 스레드와 더 많은 데이터 (스레드를 전혀 사용하지 않는)에 대한 최대 50 개의 보류중인 요청을 원할 수 있습니다. AFAICT, 이것은 현재 귀하의 코드가하는 것이 아닙니다.
TPL Dataflow 또는 Rx가 선호되는 솔루션으로 유용 할 수 있습니다. 그러나 일반 TPL로 이와 같은 것을 구현하는 것이 가능합니다. 여기서 유일한 차단 코드는 내부에서 실제 데이터 처리를 수행하는 코드입니다.Task.Run:
const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;
// process data
class Processing
{
SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
HashSet<Task> _pending = new HashSet<Task>();
object _lock = new Object();
async Task ProcessAsync(string data)
{
await _semaphore.WaitAsync();
try
{
await Task.Run(() =>
{
// simuate work
Thread.Sleep(1000);
Console.WriteLine(data);
});
}
finally
{
_semaphore.Release();
}
}
public async void QueueItemAsync(string data)
{
var task = ProcessAsync(data);
lock (_lock)
_pending.Add(task);
try
{
await task;
}
catch
{
if (!task.IsCanceled && !task.IsFaulted)
throw; // not the task's exception, rethrow
// don't remove faulted/cancelled tasks from the list
return;
}
// remove successfully completed tasks from the list
lock (_lock)
_pending.Remove(task);
}
public async Task WaitForCompleteAsync()
{
Task[] tasks;
lock (_lock)
tasks = _pending.ToArray();
await Task.WhenAll(tasks);
}
}
// download data
static async Task DownloadAsync(string[] urls)
{
var processing = new Processing();
using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
using (var httpClient = new HttpClient())
{
var tasks = urls.Select(async (url) =>
{
await semaphore.WaitAsync();
try
{
var data = await httpClient.GetStringAsync(url);
// put the result on the processing pipeline
processing.QueueItemAsync(data);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks.ToArray());
await processing.WaitForCompleteAsync();
}
}
BlockingCollection. 나는 내가 사용할 수 있다는 것을 깨닫지 못했다.WaitAsync그런 식으로. 고마워 .Noseratio. - Josh Wyantasync내 다른 목표였던 여러 코어로 작업하십시오. 이 답변은 저의 첫 번째 목표를 다루며, Dataflow는이 두 가지 문제를 해결합니다. @Noseratio - Josh Wyant
요청에 따라, 여기에 내가 끝내는 코드가 있습니다.
작업은 마스터 - 세부 구성으로 설정되며 각 마스터는 일괄 처리로 처리됩니다. 각 작업 단위 (UOW)는 다음과 같이 대기열에 있습니다.
var success = true;
// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
await masterBuffer.SendAsync(master);
}
// Finished sending master records
masterBuffer.Complete();
// Now, wait for all the batches to complete.
await batchAction.Completion;
return success;
마스터는 다른 외부 프로세스에 대한 작업을 저장하기 위해 한 번에 하나씩 버퍼링됩니다. 각 마스터의 세부 정보는masterTransform
TransformManyBlock. 에이BatchedJoinBlock하나의 배치로 세부 정보를 수집하기 위해 생성됩니다.
실제 작업은detailTransform
TransformBlock, 비동기 적으로, 한 번에 150.BoundedCapacity너무 많은 마스터가 체인의 시작 부분에서 버퍼되지 않도록하고 300 개의 레코드를 한 번에 처리 할 수 있도록 대기열에 보관할 충분한 세부 레코드 공간을 남겨 두도록 300으로 설정됩니다. 블록은object링크가 타겟인지 여부에 따라 링크를 통해 필터링되므로Detail또는Exception.
그만큼batchAction
ActionBlock모든 배치에서 출력을 수집하고 각 배치에 대해 대량 데이터베이스 업데이트, 오류 로깅 등을 수행합니다.
몇 가지가있을 것입니다.BatchedJoinBlocks, 각 마스터마다 하나씩. 이후 각ISourceBlock순차적으로 출력되며 각 일괄 처리는 하나의 마스터와 관련된 세부 레코드 수만 허용하므로 일괄 처리가 순서대로 처리됩니다. 각 블록은 하나의 그룹 만 출력하고 완료되면 연결 해제됩니다. 마지막 배치 블록 만 완료를 최종 위치로 전달합니다.ActionBlock.
데이터 흐름 네트워크 :
// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;
// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });
// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
var records = await StoredProcedures.GetObjectsAsync(masterRecord);
// Filter the master records based on some criteria here
var filteredRecords = records;
// Only propagate completion to the last batch
var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;
// Create a batch join block to encapsulate the results of the master record.
var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
// Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });
// Unlink batchjoinblock upon completion.
// (the returned task does not need to be awaited, despite the warning.)
batchjoinblock.Completion.ContinueWith(task =>
{
detailLink1.Dispose();
detailLink2.Dispose();
batchLink.Dispose();
});
return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
try
{
// Perform the action for each detail here asynchronously
await DoSomethingAsync();
return detail;
}
catch (Exception e)
{
success = false;
return e;
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });
// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
var details = batch.Item1.Cast<Detail>();
var errors = batch.Item2.Cast<Exception>();
// Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
await. 그만큼Parallel.ForEach2 ~ 4 개의 동시 발생 효과를 얻는다.while루프. - Josh Wyant