19

私は同期APIとスレッドプールを使って次のようなTCPサーバを開発しています。

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    ThreadPool.QueueUserWorkItem(this.HandleConnection, client);
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}

私の目標は、できるだけ少ないリソース使用量でできるだけ多くの同時接続を処理することであると仮定すると、これは利用可能なスレッドの数によって急速に制限されるように思われます。ノンブロッキングタスクAPIを使用することで、少ないリソースでより多く処理できるようになると思います。

私の最初の印象は次のようなものです。

async Task Serve(){
  while(true){
    var client = await listener.AcceptTcpClientAsync();
    HandleConnectionAsync(client); //fire and forget?
  }
}

しかし、これがボトルネックの原因になる可能性があることは私を驚かせます。おそらく、HandleConnectionAsyncが最初の待ち状態になるまでに異常に長い時間がかかり、メインのacceptループが進行しなくなるでしょう。これで1つのスレッドしか使用されなくなるのでしょうか。それとも、ランタイムはそれが適していると考えるように複数のスレッド上で物事を魔法のように実行するでしょうか。

これらの2つのアプローチを組み合わせて、私のサーバーがアクティブに実行されているタスクの数に必要なスレッド数を正確に使用するようにしながら、IO操作で不要にスレッドをブロックしないようにする方法はありますか。

このような状況でスループットを最大化するための慣用的な方法はありますか?


  • async / awaitは現在のスレッド、afaikから機能します。タスクプールでスケジュールしたい場合は、明示的にタスクを使用する必要があります。つまり、現在の実装は純粋にシングルスレッドです。 - cwharris
  • これはあなたにすべての答えを与えるかもしれません:ソケット操作待ち。 - noseratio
  • @Noseratio、これはまったく異なる質問に答えますが、私がそれをどのように扱っているのかわかりません。その記事では、タスクベースのAPIを公開しているかのように、異なる非同期パターンを公開するAPIの待機について説明しています。 TcpListenerは確かにタスクベースのAPIを提供しています。 - captncraig
  • @ captncraig、確かに、私は最初に質問を理解していませんでした。よりよく理解して、ここに私の考え。 - noseratio

5 답변


22

プロファイリングテストで必要と思われる場合を除いて、フレームワークにスレッド処理を管理させ、余分なスレッドを作成しないようにします。特に電話がかかってきた場合HandleConnectionAsyncほとんどIOに制限されています。

とにかく、呼び出しスレッド(ディスパッチャ)を解放したいのなら、HandleConnectionAsync非常に簡単な解決策があります。あなたはから新しいスレッドにジャンプすることができますThreadPoolawait Yield()これは、初期スレッド(コンソールアプリ、WCFサービス)に同期コンテキストがインストールされていない実行環境でサーバーを実行している場合に機能します。通常はTCPサーバーの場合です。

[編集済み]以下はこれを説明しています(コードはもともとここに)注意、メインwhileloopは明示的にスレッドを作成しません。

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

class Program
{
    object _lock = new Object(); // sync lock 
    List<Task> _connections = new List<Task>(); // pending connections

    // The core server task
    private async Task StartListener()
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            // if already faulted, re-throw any error on the calling context
            if (task.IsFaulted)
                task.Wait();
        }
    }

    // Register and handle the connection
    private async Task StartHandleConnectionAsync(TcpClient tcpClient)
    {
        // start the new connection task
        var connectionTask = HandleConnectionAsync(tcpClient);

        // add it to the list of pending task 
        lock (_lock)
            _connections.Add(connectionTask);

        // catch all errors of HandleConnectionAsync
        try
        {
            await connectionTask;
            // we may be on another thread after "await"
        }
        catch (Exception ex)
        {
            // log the error
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            // remove pending task
            lock (_lock)
                _connections.Remove(connectionTask);
        }
    }

    // Handle new connection
    private async Task HandleConnectionAsync(TcpClient tcpClient)
    {
        await Task.Yield();
        // continue asynchronously on another threads

        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    }

    // The entry point of the console app
    static void Main(string[] args)
    {
        Console.WriteLine("Hit Ctrl-C to exit.");
        new Program().StartListener().Wait();
    }
}

あるいは、コードは以下のようになります。await Task.Yield()。注意、私は渡しますasyncλ〜λTask.Runまだやりたいから内部の非同期APIの恩恵を受けるHandleConnectionAsyncそして使うawaitそこで:

// Handle new connection
private static Task HandleConnectionAsync(TcpClient tcpClient)
{
    return Task.Run(async () =>
    {
        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    });
}

[更新]コメントに基づくと、これがライブラリコードになる場合、実行環境は実際には未知であり、デフォルト以外の同期コンテキストを持つ可能性があります。この場合は、メインサーバーループをプールスレッドで実行します(同期コンテキストはありません)。

private static Task StartListener()
{
    return Task.Run(async () => 
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            // if already faulted, re-throw any error on the calling context
            if (task.IsFaulted)
                task.Wait();
        }
    });
}

このようにして、内部に作成されたすべての子タスクStartListenerクライアントコードの同期コンテキストの影響を受けません。だから、私は呼び出す必要はないだろうTask.ConfigureAwait(false)明示的にどこでも。 


  • とても興味深い。実行する環境を保証できない場合(これはライブラリ内にあります)、Task.Yieldのドキュメントには、SynchronizatonContextの切り替えが正しく行われていないと記載されています。 Task.Runは私の最も安全なオプションかもしれません。 - captncraig
  • @ captncraig、ドキュメント上の長い処理のためにそれを使用しないことを意味UIスレッド(タイトなループのようにawait Task.Yeild())それはsynだからです。 UIスレッドが使用するコンテキストPostMessageこれは、マウスやキーボードなどの他のユーザー入力メッセージを引き継ぎ、UIをブロックする可能性があるためです。そのすべてがコンテキストフリー環境には当てはまりません。Task.Yield()ただ使うThreadPool.QueueUserWorkItem便利な近道です。もう少し情報:stackoverflow.com/q/20319769/1768303 - noseratio
  • @Noseratioの使用を拡張してもらえますかTask.Runあなたのコードの中?私の理解しているように、これは主にのために待っている入出力操作です。TCPClientデータを受信します。なぜ新しいものがあるべきなのかThreadPoolのための発砲while (true)ループ? - Yuval Itzchakov
  • @YuvalItzchakov、それは「フックを脱ぐ」ためだけにあります。クライアントコードが呼び出し側スレッドで持つことができる任意の同期コンテキストの。これは1回限りのもので、に置き換えることができますawait tcpListener.AcceptTcpClientAsync().ConfigureAwait(false)。 - noseratio
  • すばらしいです。説明してくれてありがとう! - Yuval Itzchakov

8

既存の回答は正しく使用することを提案していますTask.Run(() => HandleConnection(client));しかし、なぜ説明されていません。

理由は次のとおりです。HandleConnectionAsync最初の待ち合わせには時間がかかることがあります。あなたが非同期IOを使うことに固執するなら(あなたがこのケースでそうするべきであるように)これはそれを意味しますHandleConnectionAsyncブロッキングすることなくCPUバウンドの作業を行っています。これはスレッドプールに最適なケースです。それは短く、ノンブロッキングのCPU作業を実行するために作られています。

そして、あなたは正しい、acceptループは次のように調整されるでしょう。HandleConnectionAsync戻る前に長い時間がかかる(おそらくそれにはかなりのCPUバウンドの作業があるため)。頻繁に新しい接続が必要な場合は、これを避ける必要があります。

ループを調整する重要な作業がないことが確実な場合は、追加のスレッドプールを節約できます。Taskしないでください。

あるいは、複数の受け入れを同時に実行することもできます。交換するawait Serve();(例えば)によって:

var serverTasks =
    Enumerable.Range(0, Environment.ProcessorCount)
    .Select(_ => Serve());
await Task.WhenAll(serverTasks);

これにより、スケーラビリティの問題が解消されます。ご了承くださいawaitここで1つを除いてすべてのエラーを飲み込みます。


1

やってみる

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    Task.Run(() => this.HandleConnection(client));
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}


  • これは最初の質問にどのように答えますか? - Leri
  • 私の元々の実装がやっているように、これは本質的にすべての接続に対して新しいスレッドを生み出さないでしょうか? - captncraig
  • タスクを使用してこれを実行したいと思いました...「スレッドプールサーバー」を使用してできることは何もないので。スレッド数が制限される可能性があります。各IO呼び出しには、ノンブロッキングAPIを使用する必要があります。 - Aron
  • 「接続ごとに新しいスレッドが生成されることは本質的にありません」。いいえ。それは新しいものを生み出しています仕事しかし、1つのタスクが1つと等しくない。 1つのスレッドが複数のタスクを処理でき、新しいスレッドがオンデマンドで生成されます。 - DasKrümelmonster
  • Task.Runは間違いなくではない新しいスレッドを開始するのと同じです。 Task.RunはThreadPoolからの既存のスレッドを使用しますが、Thread.Startは再利用されることのない新しいスレッドをスターします。 - Panagiotis Kanavos

1

マイクロソフトによるとhttp://msdn.microsoft.com/en-AU/library/hh524395.aspx#BKMK_VoidReturnTypevoid return typeは、例外をキャッチすることができないため、使用しないでください。あなたが指摘したように、あなたは「発射して忘れる」タスクを必要とします、それで私の結論はあなたが常にタスクを返す必要があるということです(マイクロソフトが言ったように)、しかし

TaskInstance.ContinueWith(i => { /* exception handler */ }, TaskContinuationOptions.OnlyOnFaulted);

私が証明として使用した例は以下のとおりです。

public static void Main()
{
    Awaitable()
        .ContinueWith(
            i =>
                {
                    foreach (var exception in i.Exception.InnerExceptions)
                    {
                        Console.WriteLine(exception.Message);
                    }
                },
            TaskContinuationOptions.OnlyOnFaulted);
    Console.WriteLine("This needs to come out before my exception");
    Console.ReadLine();
}

public static async Task Awaitable()
{
    await Task.Delay(3000);
    throw new Exception("Hey I can catch these pesky things");
}


0

非同期接続を受け入れる必要がある理由はありますか?つまり、クライアント接続を待つことで何か価値があるのでしょうか。これを行う唯一の理由は、接続を待っている間にサーバーで進行中の他の作業があるためです。もしあれば、おそらく次のようなことができます。

    public async void Serve()
    {
        while (true)
        {
            var client = await _listener.AcceptTcpClientAsync();
            Task.Factory.StartNew(() => HandleClient(client), TaskCreationOptions.LongRunning);
        }
    }

このようにして、受け入れは現在のスレッドを解放し、他のことを実行するためのオプションを残し、処理は新しいスレッドで実行されます。唯一のオーバーヘッドは、新しい接続の受け入れに戻る前に、クライアントを処理するための新しいスレッドを生成することです。

編集する ちょうどそれがあなたが書いたのとほぼ同じコードであることに気づいただけです。あなたが実際に尋ねていることをよりよく理解するために、私はあなたの質問をもう一度読む必要があると思います

編集2:

私のサーバーがまさしくその方法を使用するようにこれら二つのアプローチを組み合わせる方法はありますか      アクティブに実行されているタスクの数に必要なスレッドの数      IO操作で不必要にスレッドをブロックしませんか?

私の解決策が実際にこの質問に答えていると思います。それは本当に必要ですか?

編集3: Task.Factory.StartNew()で実際に新しいスレッドを作成しました。


  • 接続を非同期に受け入れることは、接続を待つ以外に何もしないようにスレッドを無駄にしないことによって価値を付加します。ほとんどの場合、接続を待つのは実際にリクエストを処理するよりもはるかに時間がかかります。また、Task.StartNewはではない新しいスレッドを生成しますがThreadPoolのスレッドを使用します - Panagiotis Kanavos
  • もちろん、他にやるべきことがあるのであれば、それは価値を付加します。したがって、私の最初の答えで私の質問です。ブロックされたスレッドと中断されたスレッドとの間に大きな違いがない限り、他に何もすることがないのであれば、私は実際に利益を得ることはできません。 TaskCreationOptions.LongRunningについてのヒントは新しいスレッドでこれを開始する必要がありますが、あなたはそのスレッドについて正しかったです。 - EMB

リンクされた質問


関連する質問

最近の質問