为了账号安全,请及时绑定邮箱和手机立即绑定

如何以同步方式合并两个 TPL DataFlow 管道?

如何以同步方式合并两个 TPL DataFlow 管道?

Go
吃鸡游戏 2022-12-24 13:55:07
我想编写一个应用程序来评估来自两个传感器的传感器数据。两个传感器都将它们的数据发送到Package对象中,这些对象被分成多个Frame对象。APackage本质上是a Tuple<Timestamp, Data[]>,aFrame是a Tuple<Timestamp, Data>。然后我需要始终Frame使用来自两个来源的最早时间戳。所以基本上我的对象流是Package -(1:n)-> Frame \                        }-pair synchronized-> Tuple<Frame, Frame>Package -(1:n)-> Frame /例子假设每个Package包含 2 或 3 个值(现实:5-7)和递增 1 的整数时间戳(现实:~200Hz => ~5ms 增量)。“数据”只是timestamp * 100为了简单起见。Packages (timestamp, values[])Source 1:{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]), (29, [2700, 2800, 2900]), ...}Source 2:{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]), (26, [2400, 2500, 2600]), ...}步骤后(1:n):Frames (timestamp, value)Source 1:{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100), (22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800), (29, 2900), ...}Source 2:{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}步骤后pair synchronized:Merged tuples (timestamp, source1, source2){(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800), (19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null), (24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}请注意,23缺少时间戳,因为两个源都没有发送值。那只是一个副作用。我可以放入或不放入一个空元组,这无关紧要。(27, 2700, 2700)元组是or也没有关系((27, 2700), (27, 2700)),即Tuple<Timestamp, Data, Data>or Tuple<Frame, Frame>。
查看完整描述

2 回答

?
牛魔王的故事

TA贡献1830条经验 获得超3个赞

TPL DataFlow API 的问题在于,一切都是内部/私有和/或密封的。这给您扩展 API 的可能性不大。


无论如何,对于您的问题,实现一个新的 SynchronizedJoinBlock 类可能是个好主意。实际的业务逻辑位于 GetMessagesRecursive 方法中:


    public sealed class SynchronizedJoinBlock<T1, T2>

        : IReceivableSourceBlock<Tuple<T1, T2>>

    {

        private readonly object _syncObject = new object();

        private readonly Func<T1, T2, int> _compareFunction;

        private readonly Queue<T1> _target1Messages;

        private readonly Queue<T2> _target2Messages;

        private readonly TransformManyBlock<T1, Tuple<T1, T2>> _target1;

        private readonly TransformManyBlock<T2, Tuple<T1, T2>> _target2;

        private readonly BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>> _batchedJoinBlock;

        private readonly TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>> _transformManyBlock;


        public ITargetBlock<T1> Target1 => _target1;


        public ITargetBlock<T2> Target2 => _target2;


        public Task Completion => _transformManyBlock.Completion;


        public SynchronizedJoinBlock(Func<T1, T2, int> compareFunction)

        {

            _compareFunction = compareFunction

                ?? throw new ArgumentNullException(nameof(compareFunction));

            _batchedJoinBlock = new BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>>(1);

            _target1Messages = new Queue<T1>();

            _target2Messages = new Queue<T2>();


            Func<ICollection<Tuple<T1, T2>>> getMessagesFunction = () =>

            {

                lock (_syncObject)

                {

                    if (_target1Messages.Count > 0 && _target2Messages.Count > 0)

                    {

                        return GetMessagesRecursive(_target1Messages.Peek(), _target2Messages.Peek()).ToArray();

                    }

                    else

                    {

                        return new Tuple<T1, T2>[0];

                    }

                }

            };


            _target1 = new TransformManyBlock<T1, Tuple<T1, T2>>((element) =>

            {

                _target1Messages.Enqueue(element);

                return getMessagesFunction();

            });

            _target1.LinkTo(_batchedJoinBlock.Target1, new DataflowLinkOptions() { PropagateCompletion = true });


            _target2 = new TransformManyBlock<T2, Tuple<T1, T2>>((element) =>

            {

                _target2Messages.Enqueue(element);

                return getMessagesFunction();

            });

            _target2.LinkTo(_batchedJoinBlock.Target2, new DataflowLinkOptions() { PropagateCompletion = true });


            _transformManyBlock = new TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>>(

                element => element.Item1.Concat(element.Item2)

            );

            _batchedJoinBlock.LinkTo(_transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });

        }


        private IEnumerable<Tuple<T1, T2>> GetMessagesRecursive(T1 value1, T2 value2)

        {

            int result = _compareFunction(value1, value2);

            if (result == 0)

            {

                yield return Tuple.Create(_target1Messages.Dequeue(), _target2Messages.Dequeue());

            }

            else if (result < 0)

            {

                yield return Tuple.Create(_target1Messages.Dequeue(), default(T2));


                if (_target1Messages.Count > 0)

                {

                    foreach (var item in GetMessagesRecursive(_target1Messages.Peek(), value2))

                    {

                        yield return item;

                    }

                }

            }

            else

            {

                yield return Tuple.Create(default(T1), _target2Messages.Dequeue());


                if (_target2Messages.Count > 0)

                {

                    foreach (var item in GetMessagesRecursive(value1, _target2Messages.Peek()))

                    {

                        yield return item;

                    }

                }

            }

        }


        public void Complete()

        {

            _target1.Complete();

            _target2.Complete();

        }


        Tuple<T1, T2> ISourceBlock<Tuple<T1, T2>>.ConsumeMessage(

            DataflowMessageHeader messageHeader,

            ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)

        {

            return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)

                .ConsumeMessage(messageHeader, target, out messageConsumed);

        }


        void IDataflowBlock.Fault(Exception exception)

        {

            ((IDataflowBlock)_transformManyBlock).Fault(exception);

        }


        public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target,

            DataflowLinkOptions linkOptions)

        {

            return _transformManyBlock.LinkTo(target, linkOptions);

        }


        void ISourceBlock<Tuple<T1, T2>>.ReleaseReservation(

            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)

        {

            ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)

                .ReleaseReservation(messageHeader, target);

        }


        bool ISourceBlock<Tuple<T1, T2>>.ReserveMessage(

            DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)

        {

            return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)

                .ReserveMessage(messageHeader, target);

        }


        public bool TryReceive(Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)

        {

            return _transformManyBlock.TryReceive(filter, out item);

        }


        public bool TryReceiveAll(out IList<Tuple<T1, T2>> items)

        {

            return _transformManyBlock.TryReceiveAll(out items);

        }

    }



查看完整回答
反对 回复 2022-12-24
?
叮当猫咪

TA贡献1776条经验 获得超12个赞

这是一个SynchronizedJoinBlock块的实现,类似于 Hardy Hobeck's answer中提供的那个。Target1这个负责处理一些次要的细节,例如取消、处理异常,以及在输入块Target2被标记为已完成时处理传播剩余项目。此外,合并逻辑不涉及递归,这应该使其性能更好(希望我没有测量它)并且不易受到堆栈溢出异常的影响。小偏差:输出是一个ValueTuple<T1, T2>而不是Tuple<T1, T2>(目的是减少分配)。


public sealed class SynchronizedJoinBlock<T1, T2> : IReceivableSourceBlock<(T1, T2)>

{

    private readonly Func<T1, T2, int> _comparison;

    private readonly Queue<T1> _queue1 = new Queue<T1>();

    private readonly Queue<T2> _queue2 = new Queue<T2>();

    private readonly ActionBlock<T1> _input1;

    private readonly ActionBlock<T2> _input2;

    private readonly BufferBlock<(T1, T2)> _output;

    private readonly object _locker = new object();


    public SynchronizedJoinBlock(Func<T1, T2, int> comparison,

        CancellationToken cancellationToken = default)

    {

        _comparison = comparison ?? throw new ArgumentNullException(nameof(comparison));


        // Create the three internal blocks

        var options = new ExecutionDataflowBlockOptions()

        {

            CancellationToken = cancellationToken

        };

        _input1 = new ActionBlock<T1>(Add1, options);

        _input2 = new ActionBlock<T2>(Add2, options);

        _output = new BufferBlock<(T1, T2)>(options);


        // Link the input blocks with the output block

        var inputTasks = new Task[] { _input1.Completion, _input2.Completion };

        Task.WhenAny(inputTasks).Unwrap().ContinueWith(t =>

        {

            // If ANY input block fails, then the whole block has failed

            ((IDataflowBlock)_output).Fault(t.Exception.InnerException);

            if (!_input1.Completion.IsCompleted) _input1.Complete();

            if (!_input2.Completion.IsCompleted) _input2.Complete();

            ClearQueues();

        }, default, TaskContinuationOptions.OnlyOnFaulted |

            TaskContinuationOptions.RunContinuationsAsynchronously,

            TaskScheduler.Default);

        Task.WhenAll(inputTasks).ContinueWith(t =>

        {

            // If ALL input blocks succeeded, then the whole block has succeeded

            try

            {

                if (!t.IsCanceled) PostRemaining(); // Post what's left

            }

            catch (Exception ex)

            {

                ((IDataflowBlock)_output).Fault(ex);

            }

            _output.Complete();

            ClearQueues();

        }, default, TaskContinuationOptions.NotOnFaulted |

            TaskContinuationOptions.RunContinuationsAsynchronously,

            TaskScheduler.Default);

    }


    public ITargetBlock<T1> Target1 => _input1;

    public ITargetBlock<T2> Target2 => _input2;

    public Task Completion => _output.Completion;


    private void Add1(T1 value1)

    {

        lock (_locker)

        {

            _queue1.Enqueue(value1);

            FindAndPostMatched_Unsafe();

        }

    }


    private void Add2(T2 value2)

    {

        lock (_locker)

        {

            _queue2.Enqueue(value2);

            FindAndPostMatched_Unsafe();

        }

    }


    private void FindAndPostMatched_Unsafe()

    {

        while (_queue1.Count > 0 && _queue2.Count > 0)

        {

            var result = _comparison(_queue1.Peek(), _queue2.Peek());

            if (result < 0)

            {

                _output.Post((_queue1.Dequeue(), default));

            }

            else if (result > 0)

            {

                _output.Post((default, _queue2.Dequeue()));

            }

            else // result == 0

            {

                _output.Post((_queue1.Dequeue(), _queue2.Dequeue()));

            }

        }

    }


    private void PostRemaining()

    {

        lock (_locker)

        {

            while (_queue1.Count > 0)

            {

                _output.Post((_queue1.Dequeue(), default));

            }

            while (_queue2.Count > 0)

            {

                _output.Post((default, _queue2.Dequeue()));

            }

        }

    }


    private void ClearQueues()

    {

        lock (_locker)

        {

            _queue1.Clear();

            _queue2.Clear();

        }

    }


    public void Complete() => _output.Complete();


    public void Fault(Exception exception)

        => ((IDataflowBlock)_output).Fault(exception);


    public IDisposable LinkTo(ITargetBlock<(T1, T2)> target,

        DataflowLinkOptions linkOptions)

        => _output.LinkTo(target, linkOptions);


    public bool TryReceive(Predicate<(T1, T2)> filter, out (T1, T2) item)

        => _output.TryReceive(filter, out item);


    public bool TryReceiveAll(out IList<(T1, T2)> items)

        => _output.TryReceiveAll(out items);


    (T1, T2) ISourceBlock<(T1, T2)>.ConsumeMessage(

        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target,

        out bool messageConsumed)

        => ((ISourceBlock<(T1, T2)>)_output).ConsumeMessage(

            messageHeader, target, out messageConsumed);


    void ISourceBlock<(T1, T2)>.ReleaseReservation(

        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)

        => ((ISourceBlock<(T1, T2)>)_output).ReleaseReservation(

            messageHeader, target);


    bool ISourceBlock<(T1, T2)>.ReserveMessage(

        DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)

        => ((ISourceBlock<(T1, T2)>)_output).ReserveMessage(

            messageHeader, target);

}

使用示例:


var joinBlock = new SynchronizedJoinBlock<(int, int), (int, int)>(

    (x, y) => Comparer<int>.Default.Compare(x.Item1, y.Item1));


var source1 = new (int, int)[] {(17, 1700), (18, 1800), (19, 1900),

    (20, 2000), (21, 2100), (22, 2200), (25, 2500), (26, 2600),

    (27, 2700), (28, 2800), (29, 2900)};


var source2 = new (int, int)[] {(15, 1500), (16, 1600), (17, 1700),

    (18, 1800), (19, 1900), (20, 2000), (21, 2100), (24, 2400),

    (25, 2500), (26, 2600)};


Array.ForEach(source1, x => joinBlock.Target1.Post(x));

Array.ForEach(source2, x => joinBlock.Target2.Post(x));


joinBlock.Target1.Complete();

joinBlock.Target2.Complete();


while (joinBlock.OutputAvailableAsync().Result)

{

    Console.WriteLine($"> Received: {joinBlock.Receive()}");

}

输出:


收到:((0, 0), (15, 1500))

收到:((0, 0), (16, 1600))

收到:((17, 1700), (17, 1700))

收到:((18 , 1800), (18, 1800))

收到: ((19, 1900), (19, 1900))

收到: ((20, 2000), (20, 2000))

收到: ((21, 2100), ( 21, 2100))

收到:((22, 2200), (0, 0))

收到:((0, 0), (24, 2400))

收到:((25, 2500), (25, 2500))

收到:((26, 2600), (26, 2600))

收到:((27, 2700), (0, 0))

收到:((28, 2800), (0, 0))

收到:((29 , 2900), (0, 0))


假定传入数据是有序的。


这个类与JoinDependencyBlock我之前在一个有点相关的问题中发布的类具有相似的结构。



查看完整回答
反对 回复 2022-12-24
  • 2 回答
  • 0 关注
  • 84 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号