私は同期APIとスレッドプールを使って次のようなTCPサーバを開発しています。
TcpListener listener;
void Serve(){
while(true){
var client = listener.AcceptTcpClient();
ThreadPool.QueueUserWorkItem(this.HandleConnection, client);
//Or alternatively new Thread(HandleConnection).Start(client)
}
}
私の目標は、できるだけ少ないリソース使用量でできるだけ多くの同時接続を処理することであると仮定すると、これは利用可能なスレッドの数によって急速に制限されるように思われます。ノンブロッキングタスクAPIを使用することで、少ないリソースでより多く処理できるようになると思います。
私の最初の印象は次のようなものです。
async Task Serve(){
while(true){
var client = await listener.AcceptTcpClientAsync();
HandleConnectionAsync(client); //fire and forget?
}
}
しかし、これがボトルネックの原因になる可能性があることは私を驚かせます。おそらく、HandleConnectionAsyncが最初の待ち状態になるまでに異常に長い時間がかかり、メインのacceptループが進行しなくなるでしょう。これで1つのスレッドしか使用されなくなるのでしょうか。それとも、ランタイムはそれが適していると考えるように複数のスレッド上で物事を魔法のように実行するでしょうか。
これらの2つのアプローチを組み合わせて、私のサーバーがアクティブに実行されているタスクの数に必要なスレッド数を正確に使用するようにしながら、IO操作で不要にスレッドをブロックしないようにする方法はありますか。
このような状況でスループットを最大化するための慣用的な方法はありますか?
プロファイリングテストで必要と思われる場合を除いて、フレームワークにスレッド処理を管理させ、余分なスレッドを作成しないようにします。特に電話がかかってきた場合HandleConnectionAsync
ほとんどIOに制限されています。
とにかく、呼び出しスレッド(ディスパッチャ)を解放したいのなら、HandleConnectionAsync
非常に簡単な解決策があります。あなたはから新しいスレッドにジャンプすることができますThreadPool
とawait Yield()
。これは、初期スレッド(コンソールアプリ、WCFサービス)に同期コンテキストがインストールされていない実行環境でサーバーを実行している場合に機能します。通常はTCPサーバーの場合です。
[編集済み]以下はこれを説明しています(コードはもともとここに)注意、メインwhile
loopは明示的にスレッドを作成しません。
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
class Program
{
object _lock = new Object(); // sync lock
List<Task> _connections = new List<Task>(); // pending connections
// The core server task
private async Task StartListener()
{
var tcpListener = TcpListener.Create(8000);
tcpListener.Start();
while (true)
{
var tcpClient = await tcpListener.AcceptTcpClientAsync();
Console.WriteLine("[Server] Client has connected");
var task = StartHandleConnectionAsync(tcpClient);
// if already faulted, re-throw any error on the calling context
if (task.IsFaulted)
task.Wait();
}
}
// Register and handle the connection
private async Task StartHandleConnectionAsync(TcpClient tcpClient)
{
// start the new connection task
var connectionTask = HandleConnectionAsync(tcpClient);
// add it to the list of pending task
lock (_lock)
_connections.Add(connectionTask);
// catch all errors of HandleConnectionAsync
try
{
await connectionTask;
// we may be on another thread after "await"
}
catch (Exception ex)
{
// log the error
Console.WriteLine(ex.ToString());
}
finally
{
// remove pending task
lock (_lock)
_connections.Remove(connectionTask);
}
}
// Handle new connection
private async Task HandleConnectionAsync(TcpClient tcpClient)
{
await Task.Yield();
// continue asynchronously on another threads
using (var networkStream = tcpClient.GetStream())
{
var buffer = new byte[4096];
Console.WriteLine("[Server] Reading from client");
var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
Console.WriteLine("[Server] Client wrote {0}", request);
var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
Console.WriteLine("[Server] Response has been written");
}
}
// The entry point of the console app
static void Main(string[] args)
{
Console.WriteLine("Hit Ctrl-C to exit.");
new Program().StartListener().Wait();
}
}
あるいは、コードは以下のようになります。await Task.Yield()
。注意、私は渡しますあasync
λ〜λTask.Run
まだやりたいから内部の非同期APIの恩恵を受けるHandleConnectionAsync
そして使うawait
そこで:
// Handle new connection
private static Task HandleConnectionAsync(TcpClient tcpClient)
{
return Task.Run(async () =>
{
using (var networkStream = tcpClient.GetStream())
{
var buffer = new byte[4096];
Console.WriteLine("[Server] Reading from client");
var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
Console.WriteLine("[Server] Client wrote {0}", request);
var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
Console.WriteLine("[Server] Response has been written");
}
});
}
[更新]コメントに基づくと、これがライブラリコードになる場合、実行環境は実際には未知であり、デフォルト以外の同期コンテキストを持つ可能性があります。この場合は、メインサーバーループをプールスレッドで実行します(同期コンテキストはありません)。
private static Task StartListener()
{
return Task.Run(async () =>
{
var tcpListener = TcpListener.Create(8000);
tcpListener.Start();
while (true)
{
var tcpClient = await tcpListener.AcceptTcpClientAsync();
Console.WriteLine("[Server] Client has connected");
var task = StartHandleConnectionAsync(tcpClient);
// if already faulted, re-throw any error on the calling context
if (task.IsFaulted)
task.Wait();
}
});
}
このようにして、内部に作成されたすべての子タスクStartListener
クライアントコードの同期コンテキストの影響を受けません。だから、私は呼び出す必要はないだろうTask.ConfigureAwait(false)
明示的にどこでも。
await Task.Yeild()
)それはsynだからです。 UIスレッドが使用するコンテキストPostMessage
これは、マウスやキーボードなどの他のユーザー入力メッセージを引き継ぎ、UIをブロックする可能性があるためです。そのすべてがコンテキストフリー環境には当てはまりません。Task.Yield()
ただ使うThreadPool.QueueUserWorkItem
便利な近道です。もう少し情報:stackoverflow.com/q/20319769/1768303 - noseratioTask.Run
あなたのコードの中?私の理解しているように、これは主にのために待っている入出力操作です。TCPClient
データを受信します。なぜ新しいものがあるべきなのかThreadPool
のための発砲while (true)
ループ? - Yuval Itzchakovawait tcpListener.AcceptTcpClientAsync().ConfigureAwait(false)
。 - noseratio
既存の回答は正しく使用することを提案していますTask.Run(() => HandleConnection(client));
しかし、なぜ説明されていません。
理由は次のとおりです。HandleConnectionAsync
最初の待ち合わせには時間がかかることがあります。あなたが非同期IOを使うことに固執するなら(あなたがこのケースでそうするべきであるように)これはそれを意味しますHandleConnectionAsync
ブロッキングすることなくCPUバウンドの作業を行っています。これはスレッドプールに最適なケースです。それは短く、ノンブロッキングのCPU作業を実行するために作られています。
そして、あなたは正しい、acceptループは次のように調整されるでしょう。HandleConnectionAsync
戻る前に長い時間がかかる(おそらくそれにはかなりのCPUバウンドの作業があるため)。頻繁に新しい接続が必要な場合は、これを避ける必要があります。
ループを調整する重要な作業がないことが確実な場合は、追加のスレッドプールを節約できます。Task
しないでください。
あるいは、複数の受け入れを同時に実行することもできます。交換するawait Serve();
(例えば)によって:
var serverTasks =
Enumerable.Range(0, Environment.ProcessorCount)
.Select(_ => Serve());
await Task.WhenAll(serverTasks);
これにより、スケーラビリティの問題が解消されます。ご了承くださいawait
ここで1つを除いてすべてのエラーを飲み込みます。
やってみる
TcpListener listener;
void Serve(){
while(true){
var client = listener.AcceptTcpClient();
Task.Run(() => this.HandleConnection(client));
//Or alternatively new Thread(HandleConnection).Start(client)
}
}
マイクロソフトによるとhttp://msdn.microsoft.com/en-AU/library/hh524395.aspx#BKMK_VoidReturnTypevoid return typeは、例外をキャッチすることができないため、使用しないでください。あなたが指摘したように、あなたは「発射して忘れる」タスクを必要とします、それで私の結論はあなたが常にタスクを返す必要があるということです(マイクロソフトが言ったように)、しかし
TaskInstance.ContinueWith(i => { /* exception handler */ }, TaskContinuationOptions.OnlyOnFaulted);
私が証明として使用した例は以下のとおりです。
public static void Main()
{
Awaitable()
.ContinueWith(
i =>
{
foreach (var exception in i.Exception.InnerExceptions)
{
Console.WriteLine(exception.Message);
}
},
TaskContinuationOptions.OnlyOnFaulted);
Console.WriteLine("This needs to come out before my exception");
Console.ReadLine();
}
public static async Task Awaitable()
{
await Task.Delay(3000);
throw new Exception("Hey I can catch these pesky things");
}
非同期接続を受け入れる必要がある理由はありますか?つまり、クライアント接続を待つことで何か価値があるのでしょうか。これを行う唯一の理由は、接続を待っている間にサーバーで進行中の他の作業があるためです。もしあれば、おそらく次のようなことができます。
public async void Serve()
{
while (true)
{
var client = await _listener.AcceptTcpClientAsync();
Task.Factory.StartNew(() => HandleClient(client), TaskCreationOptions.LongRunning);
}
}
このようにして、受け入れは現在のスレッドを解放し、他のことを実行するためのオプションを残し、処理は新しいスレッドで実行されます。唯一のオーバーヘッドは、新しい接続の受け入れに戻る前に、クライアントを処理するための新しいスレッドを生成することです。
編集する ちょうどそれがあなたが書いたのとほぼ同じコードであることに気づいただけです。あなたが実際に尋ねていることをよりよく理解するために、私はあなたの質問をもう一度読む必要があると思います
編集2:
私のサーバーがまさしくその方法を使用するようにこれら二つのアプローチを組み合わせる方法はありますか アクティブに実行されているタスクの数に必要なスレッドの数 IO操作で不必要にスレッドをブロックしませんか?
私の解決策が実際にこの質問に答えていると思います。それは本当に必要ですか?
編集3: Task.Factory.StartNew()で実際に新しいスレッドを作成しました。