2 回答
TA贡献1777条经验 获得超3个赞
这是另一种方法。此实现在概念上与 alexm 的实现相同,直到没有频道立即可用的项目为止。然后它的不同之处在于避免了Task.WhenAny
循环模式,而是为每个通道启动一个异步循环。所有循环都在竞相更新共享变量,该共享变量在临界区更新,以防止从多个通道消耗元素。ValueTuple<T, int, bool>
consumed
/// <summary>
/// Takes an item asynchronously from any one of the specified channel readers.
/// </summary>
public static async Task<(T Item, int Index)> TakeFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(channelReaders);
if (channelReaders.Length == 0) throw new ArgumentException(
$"The {nameof(channelReaders)} argument is a zero-length array.");
foreach (var cr in channelReaders) if (cr is null) throw new ArgumentException(
$"The {nameof(channelReaders)} argument contains at least one null element.");
cancellationToken.ThrowIfCancellationRequested();
// Fast path (at least one channel has an item available immediately)
for (int i = 0; i < channelReaders.Length; i++)
if (channelReaders[i].TryRead(out var item))
return (item, i);
// Slow path (all channels are currently empty)
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
(T Item, int Index, bool HasValue) consumed = default;
Task[] tasks = channelReaders.Select(async (channelReader, index) =>
{
while (true)
{
try
{
if (!await channelReader.WaitToReadAsync(linkedCts.Token)
.ConfigureAwait(false)) break;
}
// Only the exceptional cases below are normal.
catch (OperationCanceledException)
when (linkedCts.IsCancellationRequested) { break; }
catch when (channelReader.Completion.IsCompleted
&& !channelReader.Completion.IsCompletedSuccessfully) { break; }
// This channel has an item available now.
lock (linkedCts)
{
if (consumed.HasValue)
return; // An item has already been consumed from another channel.
if (!channelReader.TryRead(out var item))
continue; // We lost the race to consume the available item.
consumed = (item, index, true); // We consumed an item successfully.
}
linkedCts.Cancel(); // Cancel the other tasks.
return;
}
}).ToArray();
// The tasks should never fail. If a task ever fails, we have a bug.
try { foreach (var task in tasks) await task.ConfigureAwait(false); }
catch (Exception ex) { Debug.Fail("Unexpected error", ex.ToString()); throw; }
if (consumed.HasValue)
return (consumed.Item, consumed.Index);
cancellationToken.ThrowIfCancellationRequested();
Debug.Assert(channelReaders.All(cr => cr.Completion.IsCompleted));
throw new ChannelClosedException();
}
应该注意的是,这个解决方案,以及 alexm 的解决方案,都依赖于WaitToReadAsync
在一个元素被消耗时取消所有挂起的操作。不幸的是,这会触发臭名昭著的内存泄漏问题,该问题会影响具有空闲生产者的 .NET 通道。当取消通道上的任何异步操作时,取消的操作将保留在内存中,附加到通道的内部结构,直到将元素写入通道。此行为已被Microsoft 分类为设计使然,但不排除改进它的可能性。有趣的是,这种歧义使得这种效果不符合记录条件. 因此,了解这一点的唯一途径是偶然,要么从非官方渠道阅读,要么陷入其中。
TA贡献1921条经验 获得超9个赞
如果按照 Go 中使用通道的方式使用通道,问题就会容易得多:Channel(Readers) 作为输入,Channel(Readers) 作为输出。
IEnumerable<ChannelReader<T>> sources=....;
await foreach(var msg in sources.TakeFromAny(token))
{
....
}
要么
var merged=sources.TakeFromAny(token);
...
var msg=await merged.ReadAsync(token);
在这种情况下,来自所有通道阅读器的输入被复制到一个输出通道。该方法的返回值是该频道的ChannelReader。
CopyToAsync 助手
可以使用CopyToAsync函数将消息从输入源复制到输出通道:
static async Task CopyToAsync<T>(
this ChannelReader<T> input,
ChannelWriter<T> output,
CancellationToken token=default)
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
//Early exit if cancellation is requested
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
await output.WriteAsync(msg,token);
}
}
}
此代码类似于ReadAllAsync,但如果请求取消则立即退出。ReadAllAsync即使要求取消,也将退还所有可用物品。使用的方法包括
WriteAsync如果通道关闭则不会抛出异常,这使得错误处理变得更加容易。
错误处理和面向铁路的编程
WaitToReadAsync如果源出错但该异常确实会抛出,并且该异常将传播到调用方法并传播到Task.WhenAll输出通道。
这可能有点混乱,因为它会中断整个管道。为避免这种情况,可以将错误吞没或记录在内部CopyToAsync。一个更好的选择是使用面向铁路的编程并将所有消息包装在一个Result<TMsg,TError>类中,例如:
static async Task CopyToAsync<Result<T,Exception>>(
this ChannelReader<Result<T,Exception>> input,
ChannelWriter<Result<T,Exception>> output,
CancellationToken token=default)
{
try
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
//Early exit if cancellation is requested
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
var newMsg=Result.FromValue(msg);
await output.WriteAsync(newMsg,token);
}
}
}
catch(Exception exc)
{
output.TryWrite(Result<T>.FromError(exc));
}
}
TakeFromAsync
TakeFromAny(MergeAsync可能是更好的名字)可以是:
static ChannelReader<T> TakeFromAny(
this IEnumerable<ChannelReader<T> inputs,
CancellationToken token=default)
{
var outChannel=Channel.CreateBounded<T>(1);
var readers=inputs.Select(rd=>CopyToAsync(rd,outChannel,token));
_ = Task.WhenAll(readers)
.ContinueWith(t=>outChannel.TryComplete(t.Exception));
return outChannel;
}
使用 1 的有界容量可确保下游代码的背压行为不会改变。
添加源索引
这也可以调整为发出源的索引:
static async Task CopyToAsync<T>(
this ChannelReader<T> input,int index,
ChannelWriter<(T,int)> output,
CancellationToken token=default)
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
await output.WriteAsync((msg,index),token);
}
}
}
static ChannelReader<(T,int)> TakeFromAny(
this IEnumerable<ChannelReader<T> inputs,
CancellationToken token=default)
{
var outChannel=Channel.CreateBounded<(int,T)>(1);
var readers=inputs.Select((rd,idx)=>CopyToAsync(rd,idx,outChannel,token));
_ = Task.WhenAll(readers)
.ContinueWith(t=>outChannel.TryComplete(t.Exception));
return outChannel;
}
- 2 回答
- 0 关注
- 112 浏览
添加回答
举报