I have a class Receiver with an ActionBlock:
public class Receiver<T> : IReceiver<T>
{
private ActionBlock<T> _receiver;
public Task<bool> Send(T item)
{
if(_receiver!=null)
return _receiver.SendAsync(item);
//Do some other stuff her
}
public void Register (Func<T, Task> receiver)
{
_receiver = new ActionBlock<T> (receiver);
}
//...
}
The Register-Action for the ActionBlock is a async-Method with a await-Statement:
private static async Task Writer(int num)
{
Console.WriteLine("start " + num);
await Task.Delay(500);
Console.WriteLine("end " + num);
}
Now what i want to do is to wait synchronously (if a condition is set) until the action method is finished to get an exclusive behavior:
var receiver = new Receiver<int>();
receiver.Register((Func<int, Task) Writer);
receiver.Send(5).Wait(); //does not wait the action-await here!
The Problem is when the "await Task.Delay(500);" statement is executed, the "receiver.Post(5).Wait();" does not wait anymore.
I have tried several variants (TaskCompletionSource, ContinueWith, ...) but it does not work.
Has anyone an idea how to solve the problem?
ActionBlock
by default will enforce exclusive behavior (only one item is processed at a time). If you mean something else by "exclusive behavior", you can use TaskCompletionSource
to notify your sender when the action is complete:
... use ActionBlock<Tuple<int, TaskCompletionSource<object>>> and Receiver<Tuple<int, TaskCompletionSource<object>>>
var receiver = new Receiver<Tuple<int, TaskCompletionSource<object>>>();
receiver.Register((Func<Tuple<int, TaskCompletionSource<object>>, Task) Writer);
var tcs = new TaskCompletionSource<object>();
receiver.Send(Tuple.Create(5, tcs));
tcs.Task.Wait(); // if you must
private static async Task Writer(int num, TaskCompletionSource<object> tcs)
{
Console.WriteLine("start " + num);
await Task.Delay(500);
Console.WriteLine("end " + num);
tcs.SetResult(null);
}
Alternatively, you could use AsyncLock
(included in my AsyncEx library):
private static AsyncLock mutex = new AsyncLock();
private static async Task Writer(int num)
{
using (await mutex.LockAsync())
{
Console.WriteLine("start " + num);
await Task.Delay(500);
Console.WriteLine("end " + num);
}
}
AsyncLock
. See updated answer for a code sample. You no longer know when an item is done processing, but each item will be processed one at a time (including async
processing). - Stephen Cleary
_receiver
into aTransformBlock
and put the following action into a newActionBlock
, linked to_receiver
? - svick