19

I am working on a tcp server that looks something like this using synchronous apis and the thread pool:

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    ThreadPool.QueueUserWorkItem(this.HandleConnection, client);
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}

Assuming my goal is to handle as many concurrent connections as possible with the lowest resource usage, this seems that it will be quickly limited by the number of available threads. I suspect that by using Non-blocking Task apis, I will be able to handle much more with fewer resources.

My initial impression is something like:

async Task Serve(){
  while(true){
    var client = await listener.AcceptTcpClientAsync();
    HandleConnectionAsync(client); //fire and forget?
  }
}

But it strikes me that this could cause bottlenecks. Perhaps HandleConnectionAsync will take an unusually long time to hit the first await, and will stop the main accept loop from proceeding. Will this only use one thread ever, or will the runtime magically run things on multiple threads as it sees fit?

Is there a way to combine these two approaches so that my server will use exactly the number of threads it needs for the number of actively running tasks, but so that it will not block threads unnecessarily on IO operations?

Is there an idiomatic way to maximize throughput in a situation like this?


  • async / await works off of the current thread, afaik. If you want to schedule them in the task pool, you'll need to explicitly use a Task. That is to say, your current implementation is purely single threaded. - cwharris
  • This may give you all the answers: Awaiting Socket Operations. - noseratio
  • @Noseratio, that answers an entirely different question, but I don't see how it addresses mine. That article deals with awaiting apis that expose different async patterns as if they exposed Task based apis. The TcpListener does indeed offer Task based apis. - captncraig
  • @captncraig, admittedly, I didn't understand the question initially. With better understanding, here are my thoughs. - noseratio

5 답변


22

I'd let the Framework manage the threading and wouldn't create any extra threads, unless profiling tests suggest I might need to. Especially, if the calls inside HandleConnectionAsync are mostly IO-bound.

Anyway, if you like to release the calling thread (the dispatcher) at the beginning of HandleConnectionAsync, there's a very easy solution. You can jump on a new thread from ThreadPool with await Yield(). That works if you server runs in the execution environment which does not have any synchronization context installed on the initial thread (a console app, a WCF service), which is normally the case for a TCP server.

[EDITED] The following illustrate this (the code is originally from here). Note, the main while loop doesn't create any threads explicitly:

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

class Program
{
    object _lock = new Object(); // sync lock 
    List<Task> _connections = new List<Task>(); // pending connections

    // The core server task
    private async Task StartListener()
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            // if already faulted, re-throw any error on the calling context
            if (task.IsFaulted)
                task.Wait();
        }
    }

    // Register and handle the connection
    private async Task StartHandleConnectionAsync(TcpClient tcpClient)
    {
        // start the new connection task
        var connectionTask = HandleConnectionAsync(tcpClient);

        // add it to the list of pending task 
        lock (_lock)
            _connections.Add(connectionTask);

        // catch all errors of HandleConnectionAsync
        try
        {
            await connectionTask;
            // we may be on another thread after "await"
        }
        catch (Exception ex)
        {
            // log the error
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            // remove pending task
            lock (_lock)
                _connections.Remove(connectionTask);
        }
    }

    // Handle new connection
    private async Task HandleConnectionAsync(TcpClient tcpClient)
    {
        await Task.Yield();
        // continue asynchronously on another threads

        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    }

    // The entry point of the console app
    static void Main(string[] args)
    {
        Console.WriteLine("Hit Ctrl-C to exit.");
        new Program().StartListener().Wait();
    }
}

Alternatively, the code might look like below, without await Task.Yield(). Note, I pass an async lambda to Task.Run, because I still want to benefit from async APIs inside HandleConnectionAsync and use await in there:

// Handle new connection
private static Task HandleConnectionAsync(TcpClient tcpClient)
{
    return Task.Run(async () =>
    {
        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    });
}

[UPDATE] Based upon the comment: if this is going to be a library code, the execution environment is indeed unknown, and may have a non-default synchronization context. In this case, I'd rather run the main server loop on a pool thread (which is free of any synchronization context):

private static Task StartListener()
{
    return Task.Run(async () => 
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            // if already faulted, re-throw any error on the calling context
            if (task.IsFaulted)
                task.Wait();
        }
    });
}

This way, all child tasks created inside StartListener wouldn't be affected by the synchronization context of the client code. So, I wouldn't have to call Task.ConfigureAwait(false) anywhere explicitly. 


  • Very interesting. If I can't guarantee the environment that it will run in (this is in a library), it looks like the documentation for Task.Yield says not to count on the SynchronizatonContext switching back properly. Task.Run may be my safest option. - captncraig
  • @captncraig, the docs mean to not use it for a lengthy processing on a UI thread (like tight looping with await Task.Yeild()). That's because the syn. context of the UI thread uses PostMessage for this deep inside, which may take over other user input messages, like mouse and keyboard, and block the UI. All of that is not applicable to a context-free environment, where Task.Yield() just uses ThreadPool.QueueUserWorkItem, being a handy awaitable shortcut. Some more info: stackoverflow.com/q/20319769/1768303 - noseratio
  • @Noseratio Could you please extend on the use of Task.Run inside your code? as my understanding this is primarily an I/O operation of waiting for the for the TCPClient to receive data. Why should there be a new ThreadPool thread firing for the while (true) loop? - Yuval Itzchakov
  • @YuvalItzchakov, it's the there only to "get off the hook" of any synchronization context the client code may have on the calling thread. It's a one-time thing, and it can be replaced with await tcpListener.AcceptTcpClientAsync().ConfigureAwait(false). - noseratio
  • Great. Thanks for the explanation! - Yuval Itzchakov

8

The existing answers have correctly proposed to use Task.Run(() => HandleConnection(client));, but not explained why.

Here's why: You are concerned, that HandleConnectionAsync might take some time to hit the first await. If you stick to using async IO (as you should in this case) this means that HandleConnectionAsync is doing CPU-bound work without any blocking. This is a perfect case for the thread-pool. It is made to run short, non-blocking CPU work.

And you are right, that the accept loop would be throttled by HandleConnectionAsync taking a long time before returning (maybe because there is significant CPU-bound work in it). This is to be avoided if you need a high frequency of new connections.

If you are sure that there is no significant work throttling the loop you can save the additional thread-pool Task and not do it.

Alternatively, you can have multiple accepts running at the same time. Replace await Serve(); by (for example):

var serverTasks =
    Enumerable.Range(0, Environment.ProcessorCount)
    .Select(_ => Serve());
await Task.WhenAll(serverTasks);

This removes the scalability problems. Note, that await will swallow all but one error here.


1

Try

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    Task.Run(() => this.HandleConnection(client));
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}


  • How does this answer initial question? - Leri
  • Isn't this essentially spawning a new thread for every connection, just like my original implementation is doing? - captncraig
  • I thought you wanted to do this using Tasks...as it stands there is nothing you can do with your "thread pool server" that can limit your number of threads. You need to use the non-blocking APIs for each of your IO calls. - Aron
  • "Isn't this essentially spawning a new thread for every connection" No. It is spawning an new Task, but one Task does not equal one Thread. One Thread can handle more than one Task and new Threads are spawned on demand. - DasKrümelmonster
  • Task.Run is definitely not the same as starting a new Thread. Task.Run will use an existing thread from the ThreadPool while Thread.Start will star a new thread that will never be reused - Panagiotis Kanavos

1

According to the Microsoft http://msdn.microsoft.com/en-AU/library/hh524395.aspx#BKMK_VoidReturnType, the void return type shouldn't be used because it is not able to catch exceptions. As you have pointed out you do need "fire and forget" tasks, so my conclusion is to that you must always return Task (as Microsoft have said), but you should catch the error using:

TaskInstance.ContinueWith(i => { /* exception handler */ }, TaskContinuationOptions.OnlyOnFaulted);

An example I used as proof is below:

public static void Main()
{
    Awaitable()
        .ContinueWith(
            i =>
                {
                    foreach (var exception in i.Exception.InnerExceptions)
                    {
                        Console.WriteLine(exception.Message);
                    }
                },
            TaskContinuationOptions.OnlyOnFaulted);
    Console.WriteLine("This needs to come out before my exception");
    Console.ReadLine();
}

public static async Task Awaitable()
{
    await Task.Delay(3000);
    throw new Exception("Hey I can catch these pesky things");
}


0

Is there any reason you need to accept connections async? I mean, does awaiting any client connection give you any value? The only reason for doing it would be because there are some other work going on in the server while waiting for a connection. If there is you could probably do something like this:

    public async void Serve()
    {
        while (true)
        {
            var client = await _listener.AcceptTcpClientAsync();
            Task.Factory.StartNew(() => HandleClient(client), TaskCreationOptions.LongRunning);
        }
    }

This way the accepting will release the current thread leaving option for other things to be done, and the handling is run on a new thread. The only overhead would be spawning a new thread for handling the client before it would go straight back to accepting a new connection.

Edit: Just realized it's almost the same code you wrote. Think I need to read your question again to better understand what you're actually asking :S

Edit2:

Is there a way to combine these two approaches so that my server will use exactly the number of threads it needs for the number of actively running tasks, but so that it will not block threads unnecessarily on IO operations?

Think my solution actually answer this question. Is it really necessary though?

Edit3: Made Task.Factory.StartNew() actually create a new thread.


  • Accepting a connection asynchronously does add value by not wasting a thread to do nothing but wait for a connection. In most cases, waiting for a connection will take much longer than actually processing a request. Additionally, Task.StartNew does not spawn a new thread but uses a thread from the ThreadPool - Panagiotis Kanavos
  • Of course it adds value if there's other work to do, hence my question in my original answer. I don't really see the gain if there's no other work to do unless there's huge difference between a blocked thread and suspended thread. Hinting about TaskCreationOptions.LongRunning should start this on a new thread, but you were right about that one. - EMB

Linked


Related

Latest