为了账号安全,请及时绑定邮箱和手机立即绑定

如何实现 Channels 的 BlockingCollection.TakeFromAny

如何实现 Channels 的 BlockingCollection.TakeFromAny

Go
不负相思意 2022-12-19 21:23:00
我正在尝试实现一个异步方法,该方法采用ChannelReader<T>s 数组,并从任何具有可用项目的通道中获取值。它是一种与具有以下签名的方法具有相似功能的BlockingCollection<T>.TakeFromAny方法:public static int TakeFromAny(BlockingCollection<T>[] collections, out T item,    CancellationToken cancellationToken);此方法返回collections从中删除项目的数组中的索引。async方法不能有参数out,所以我要实现的 API 是这样的:public static Task<(T Item, int Index)> TakeFromAnyAsync<T>(    ChannelReader<T>[] channelReaders,    CancellationToken cancellationToken = default);该方法应该异步读取一个项目,并返回消耗的项目以及数组TakeFromAnyAsync<T>中关联通道的索引。channelReaders如果所有通道都已完成(成功或出错),或者在 期间全部完成await,则该方法应异步抛出一个ChannelClosedException.我的问题是:如何实施该TakeFromAnyAsync<T>方法?实现看起来很棘手。很明显,在任何情况下,该方法都不应从通道中消耗多个项目。此外,它不应遗留即发即弃的任务,或让一次性资源未被处置。该方法通常会在循环中调用,因此它也应该相当高效。它的复杂度应该不比 O(n) 差,其中n通道的数量。要了解此方法的用处,您可以查看Go语言的select语句。从旅游:该select语句让 goroutine 等待多个通信操作。Aselect阻塞直到它的一个 case 可以运行,然后它执行那个 case。如果多个准备就绪,它会随机选择一个。select {case msg1 := <-c1:    fmt.Println("received", msg1)case msg2 := <-c2:    fmt.Println("received", msg2)}在上面的示例中,要么从通道中获取一个值c1并分配给变量msg1,要么从通道中获取一个值c2并分配给变量msg2。Goselect语句不限于从通道读取。它可以包括多种异构情况,如写入有界通道、等待计时器等。复制 Goselect语句的全部功能超出了这个问题的范围。
查看完整描述

2 回答

?
慕森王

TA贡献1777条经验 获得超3个赞

这是另一种方法。此实现在概念上与 alexm 的实现相同,直到没有频道立即可用的项目为止。然后它的不同之处在于避免了Task.WhenAny循环模式,而是为每个通道启动一个异步循环。所有循环都在竞相更新共享变量,该共享变量在临界区更新,以防止从多个通道消耗元素。ValueTuple<T, int, bool> consumed

/// <summary>

/// Takes an item asynchronously from any one of the specified channel readers.

/// </summary>

public static async Task<(T Item, int Index)> TakeFromAnyAsync<T>(

    ChannelReader<T>[] channelReaders,

    CancellationToken cancellationToken = default)

{

    ArgumentNullException.ThrowIfNull(channelReaders);

    if (channelReaders.Length == 0) throw new ArgumentException(

        $"The {nameof(channelReaders)} argument is a zero-length array.");

    foreach (var cr in channelReaders) if (cr is null) throw new ArgumentException(

        $"The {nameof(channelReaders)} argument contains at least one null element.");


    cancellationToken.ThrowIfCancellationRequested();


    // Fast path (at least one channel has an item available immediately)

    for (int i = 0; i < channelReaders.Length; i++)

        if (channelReaders[i].TryRead(out var item))

            return (item, i);


    // Slow path (all channels are currently empty)

    using var linkedCts = CancellationTokenSource

        .CreateLinkedTokenSource(cancellationToken);


    (T Item, int Index, bool HasValue) consumed = default;


    Task[] tasks = channelReaders.Select(async (channelReader, index) =>

    {

        while (true)

        {

            try

            {

                if (!await channelReader.WaitToReadAsync(linkedCts.Token)

                    .ConfigureAwait(false)) break;

            }

            // Only the exceptional cases below are normal.

            catch (OperationCanceledException)

                when (linkedCts.IsCancellationRequested) { break; }

            catch when (channelReader.Completion.IsCompleted

                && !channelReader.Completion.IsCompletedSuccessfully) { break; }


            // This channel has an item available now.

            lock (linkedCts)

            {

                if (consumed.HasValue)

                    return; // An item has already been consumed from another channel.


                if (!channelReader.TryRead(out var item))

                    continue; // We lost the race to consume the available item.


                consumed = (item, index, true); // We consumed an item successfully.

            }

            linkedCts.Cancel(); // Cancel the other tasks.

            return;

        }

    }).ToArray();


    // The tasks should never fail. If a task ever fails, we have a bug.

    try { foreach (var task in tasks) await task.ConfigureAwait(false); }

    catch (Exception ex) { Debug.Fail("Unexpected error", ex.ToString()); throw; }


    if (consumed.HasValue)

        return (consumed.Item, consumed.Index);

    cancellationToken.ThrowIfCancellationRequested();

    Debug.Assert(channelReaders.All(cr => cr.Completion.IsCompleted));

    throw new ChannelClosedException();

}

应该注意的是,这个解决方案,以及 alexm 的解决方案,都依赖于WaitToReadAsync在一个元素被消耗时取消所有挂起的操作。不幸的是,这会触发臭名昭著的内存泄漏问题,该问题会影响具有空闲生产者的 .NET 通道。当取消通道上的任何异步操作时,取消的操作将保留在内存中,附加到通道的内部结构,直到将元素写入通道。此行为已被Microsoft 分类为设计使然,但不排除改进它的可能性。有趣的是,这种歧义使得这种效果不符合记录条件. 因此,了解这一点的唯一途径是偶然,要么从非官方渠道阅读,要么陷入其中。



查看完整回答
反对 回复 2022-12-19
?
郎朗坤

TA贡献1921条经验 获得超9个赞

如果按照 Go 中使用通道的方式使用通道,问题就会容易得多:Channel(Readers) 作为输入,Channel(Readers) 作为输出。


IEnumerable<ChannelReader<T>> sources=....;

await foreach(var msg in sources.TakeFromAny(token))

{

....

}

要么


var merged=sources.TakeFromAny(token);

...

var msg=await merged.ReadAsync(token);

在这种情况下,来自所有通道阅读器的输入被复制到一个输出通道。该方法的返回值是该频道的ChannelReader。


CopyToAsync 助手


可以使用CopyToAsync函数将消息从输入源复制到输出通道:


static async Task CopyToAsync<T>(

        this ChannelReader<T> input,

        ChannelWriter<T> output,

        CancellationToken token=default)

{

   while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))

   {

         //Early exit if cancellation is requested

         while (!token.IsCancellationRequested &&  input.TryRead(out T? msg))

         {

             await output.WriteAsync(msg,token);

         }

   }

}

此代码类似于ReadAllAsync,但如果请求取消则立即退出。ReadAllAsync即使要求取消,也将退还所有可用物品。使用的方法包括


WriteAsync如果通道关闭则不会抛出异常,这使得错误处理变得更加容易。


错误处理和面向铁路的编程


WaitToReadAsync如果源出错但该异常确实会抛出,并且该异常将传播到调用方法并传播到Task.WhenAll输出通道。


这可能有点混乱,因为它会中断整个管道。为避免这种情况,可以将错误吞没或记录在内部CopyToAsync。一个更好的选择是使用面向铁路的编程并将所有消息包装在一个Result<TMsg,TError>类中,例如:


static async Task CopyToAsync<Result<T,Exception>>(

        this ChannelReader<Result<T,Exception>> input,

        ChannelWriter<Result<T,Exception>> output,

        CancellationToken token=default)

{

   try

   {

     while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))

     {

         //Early exit if cancellation is requested

         while (!token.IsCancellationRequested &&  input.TryRead(out T? msg))

         {

             var newMsg=Result.FromValue(msg);

             await output.WriteAsync(newMsg,token);

         }

     }

  }

  catch(Exception exc)

  {

    output.TryWrite(Result<T>.FromError(exc));

  }

}

TakeFromAsync


TakeFromAny(MergeAsync可能是更好的名字)可以是:


static ChannelReader<T> TakeFromAny(

        this IEnumerable<ChannelReader<T> inputs,

        CancellationToken token=default)

{

    var outChannel=Channel.CreateBounded<T>(1);


    var readers=inputs.Select(rd=>CopyToAsync(rd,outChannel,token));


    _ = Task.WhenAll(readers)

            .ContinueWith(t=>outChannel.TryComplete(t.Exception));

    return outChannel;

}

使用 1 的有界容量可确保下游代码的背压行为不会改变。


添加源索引


这也可以调整为发出源的索引:


static async Task CopyToAsync<T>(

        this ChannelReader<T> input,int index,

        ChannelWriter<(T,int)> output,

        CancellationToken token=default)

{

  while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))

  {

        while (!token.IsCancellationRequested &&  input.TryRead(out T? msg))

        {


            await output.WriteAsync((msg,index),token);

        }

  }

}


static ChannelReader<(T,int)> TakeFromAny(

        this IEnumerable<ChannelReader<T> inputs,

        CancellationToken token=default)

{

    var outChannel=Channel.CreateBounded<(int,T)>(1);


    var readers=inputs.Select((rd,idx)=>CopyToAsync(rd,idx,outChannel,token));


    _ = Task.WhenAll(readers)

            .ContinueWith(t=>outChannel.TryComplete(t.Exception));

    return outChannel;

}


查看完整回答
反对 回复 2022-12-19
  • 2 回答
  • 0 关注
  • 112 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信