46

주어진 시간에 완료 될 보류중인 작업의 수에 제한이있는 비동기 작업을 여러 개 실행하고 싶습니다.

1000 개의 URL이 있으며 한번에 50 개의 요청 만 열어 보려고한다고 가정 해보십시오. 그러나 하나의 요청이 완료되자 마자 목록의 다음 URL에 대한 연결이 열립니다. 이렇게하면 URL 목록을 모두 사용할 때까지 한 번에 정확히 50 개의 연결 만 열립니다.

가능하다면 주어진 수의 스레드를 활용하고 싶습니다.

나는 확장 방법을 고안했다.ThrottleTasksAsync그게 내가 원하는 걸 해. 거기에 이미 간단한 해결책이 있습니까? 나는 이것이 일반적인 시나리오라고 생각할 것이다.

용법:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

다음은 코드입니다.

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

이 방법은BlockingCollectionSemaphoreSlim그것을 작동하게합니다. throttler는 하나의 스레드에서 실행되며 모든 비동기 작업은 다른 스레드에서 실행됩니다. 병렬성을 달성하기 위해 maxDegreeOfParallelism 매개 변수를 추가하여Parallel.ForEach루프는 다시while고리.

이전 버전은 다음과 같습니다.

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

그러나 스레드 풀이 빨리 소모되고 수행 할 수 없습니다.async/await.

보너스:에서 문제를 해결하려면BlockingCollection예외가 던져지는 곳Take()언제CompleteAdding()내가 전화를 걸면TryTake타임 아웃으로 오버로드. 시간 초과를 사용하지 않은 경우TryTake, 그것의 목적을 패배시킬 것이다.BlockingCollection이후TryTake차단하지 않습니다. 더 좋은 방법이 있습니까? 이상적으로는TakeAsync방법.


  • 더 좋은 방법이 있습니까? 예,TPL 데이터 흐름. - Scott Chamberlain
  • url 예제에서 모든 URL을 ConcurrentBag에 넣고 50 개의 스레드를 시작할 수 있으며 각 스레드에서 url을 가져 와서 bag이 비게 될 때까지 요청을 수행 할 수 있습니다. - Bogdan
  • 일반적인 경우에는 대리자의 ConcurrentBag를 사용하십시오. :) - Bogdan
  • @Bogdan 나는 수천 개의 요청을하고 있지만, 나는 같은 스레드에서 모두 그들을 사용하고 싶다.await. 그만큼Parallel.ForEach2 ~ 4 개의 동시 발생 효과를 얻는다.while루프. - Josh Wyant
  • @Scott Chamberlain TPL Dataflow를 어떻게 사용하면 상황이 개선 될까요? - Josh Wyant

3 답변


50

제안 된대로 TPL 데이터 흐름을 사용하십시오.

에이TransformBlock<TInput, TOutput>네가 찾고있는 것일 수도있어.

당신은MaxDegreeOfParallelism병렬로 얼마나 많은 문자열을 변환 할 수 있는지 (즉, 얼마나 많은 URL을 다운로드 할 수 있는지) 제한합니다. 그런 다음 블록에 URL을 게시하고 작업이 끝나면 블록 추가에 항목을 추가하고 응답을 가져옵니다.

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

주 :TransformBlock입력과 출력을 모두 버퍼링합니다. 그렇다면 왜 우리는 그것을BufferBlock?

왜냐하면TransformBlock모든 항목이 완료 될 때까지 완료되지 않습니다.HttpResponse)가 소비되었고,await downloader.Completion멈출거야. 대신, 우리는downloader모든 출력을 전용 버퍼 블록으로 전달합니다. 그런 다음downloader버퍼 블록을 완료하고 검사하십시오.


  • +1 우아한 솔루션. 코드가 적어지고 기능이 다양합니다. - BlueM
  • 다행히도 내 코드는 적어도 요점을 설명했다 :) 내 경우 병렬 처리는 아마도 낮을 것이지만 많은 비동기 작업이 게시 될 것이다. 이 모델을 사용하여 비동기 작업을 어떻게 조절할 수 있습니까? - Josh Wyant
  • @ JoshWyant 한 번에 다운로드 할 수있는 URL 수를 제한 하시겠습니까? 사용MaxDegreeOfParallelism - dcastro
  • @ JoshWyant 위 코드를 사용하면 원하는만큼 URL을 게시 할 수 있습니다 (SendAsync). Theese는 블록에 의해 버퍼링됩니다. 블록은 버퍼에서 URL을 가져 와서 한 번에 50 개를 처리하지 않습니다. 그 결과는 다른 버퍼에 저장됩니다. 에이TransformBlock입력과 출력을 모두 버퍼링합니다. - dcastro
  • @dcastro 그래서, 나는 궁극적으로 Dataflow 솔루션을 사용했다. 내 첫 두려움은MaxDegreeOfParallelism정확히 같이 일했다.Parallel.ForEach병렬 처리를 위해 임의의 양의 스레드를 생성하는 것입니다. 나는 틀렸어. 매개 변수는 매우 잘 작동한다.async. Tpl.Dataflow는 아름답게 작동합니다. 감사! - Josh Wyant

38

1000 개의 URL이 있고 50 개의 요청 만 열려고한다고 가정 해 보겠습니다.   시간; 하지만 하나의 요청이 완료되자 마자 연결을 열어   목록의 다음 URL로 이동하십시오. 그렇게하면 항상 정확히 50   연결은 URL 목록이 모두 소모 될 때까지 한 번에 열립니다.

다음 간단한 솔루션이 여기에 여러 번 등장했습니다. 블로킹 코드를 사용하지 않고 명시 적으로 스레드를 생성하지 않으므로 확장 성이 뛰어납니다.

const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}

문제는가공다운로드 한 데이터의다른파이프 라인,다른병렬 처리 수준, 특히 CPU 바인딩 처리 인 경우.

예를 들어 데이터 처리 (CPU 코어 수)를 동시에 수행하는 4 개의 스레드와 더 많은 데이터 (스레드를 전혀 사용하지 않는)에 대한 최대 50 개의 보류중인 요청을 원할 수 있습니다. AFAICT, 이것은 현재 귀하의 코드가하는 것이 아닙니다.

TPL Dataflow 또는 Rx가 선호되는 솔루션으로 유용 할 수 있습니다. 그러나 일반 TPL로 이와 같은 것을 구현하는 것이 가능합니다. 여기서 유일한 차단 코드는 내부에서 실제 데이터 처리를 수행하는 코드입니다.Task.Run:

const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}


  • 이것은 가장 단순하고 가장 직접적인 대답입니다. 거의 내가하려는 일이 바로 그것입니다. 내 실수는 별도의 스레드에서 세마포를 실행하려고 시도했지만, 이렇게하면 훨씬 간단 해지고BlockingCollection. 나는 내가 사용할 수 있다는 것을 깨닫지 못했다.WaitAsync그런 식으로. 고마워 .Noseratio. - Josh Wyant
  • @ 조쉬와 이언트, 문제 없습니다. TPL Dataflow가 파이프 라인을 적절하게 설계하고 조립하면 장면 뒤에서 할 일은 거의 없다고 생각합니다. TPL 데이터 흐름 기술이 부족하다는 점만은 알고 있지만 시간을 많이 투자 할 것입니다. - noseratio
  • 너 &맞아. 일단 이해하면 TPL 데이터 흐름이 아름답게 작동합니다. 그것은 배포 문제를 처리합니다.async내 다른 목표였던 여러 코어로 작업하십시오. 이 답변은 저의 첫 번째 목표를 다루며, Dataflow는이 두 가지 문제를 해결합니다. @Noseratio - Josh Wyant
  • 동일한 끝점을 지정할 때 .net 코어에서 기본 새 HttpClient ()를 사용하여 이것을 테스트하면주의해야합니다. 기본적으로 새 HttpClient (새 HttpClientHandler {MaxConnectionsPerServer = ...})를 지정하지 않는 한 서버 당 연결 수를 제한합니다 (피들러에서 2로 제한하는 곳을 보았습니다). 이 답변의 모든 내용은 광고 된대로 작동하지만 그 설정으로는 여전히 제한 될 수 있습니다. - Tom

3

요청에 따라, 여기에 내가 끝내는 코드가 있습니다.

작업은 마스터 - 세부 구성으로 설정되며 각 마스터는 일괄 처리로 처리됩니다. 각 작업 단위 (UOW)는 다음과 같이 대기열에 있습니다.

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

마스터는 다른 외부 프로세스에 대한 작업을 저장하기 위해 한 번에 하나씩 버퍼링됩니다. 각 마스터의 세부 정보는masterTransform TransformManyBlock. 에이BatchedJoinBlock하나의 배치로 세부 정보를 수집하기 위해 생성됩니다.

실제 작업은detailTransform TransformBlock, 비동기 적으로, 한 번에 150.BoundedCapacity너무 많은 마스터가 체인의 시작 부분에서 버퍼되지 않도록하고 300 개의 레코드를 한 번에 처리 할 수 있도록 대기열에 보관할 충분한 세부 레코드 공간을 남겨 두도록 300으로 설정됩니다. 블록은object링크가 타겟인지 여부에 따라 링크를 통해 필터링되므로Detail또는Exception.

그만큼batchAction ActionBlock모든 배치에서 출력을 수집하고 각 배치에 대해 대량 데이터베이스 업데이트, 오류 로깅 등을 수행합니다.

몇 가지가있을 것입니다.BatchedJoinBlocks, 각 마스터마다 하나씩. 이후 각ISourceBlock순차적으로 출력되며 각 일괄 처리는 하나의 마스터와 관련된 세부 레코드 수만 허용하므로 일괄 처리가 순서대로 처리됩니다. 각 블록은 하나의 그룹 만 출력하고 완료되면 연결 해제됩니다. 마지막 배치 블록 만 완료를 최종 위치로 전달합니다.ActionBlock.

데이터 흐름 네트워크 :

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });

연결된 질문


관련된 질문

최근 질문