草庐IT

c# - 将两个 Observable 与一个优先级更高的 Observable 合并

coder 2024-06-03 原文

是否可以使用 ReactiveExtensions 来实现以下目标;

  • 两个 Observable,一个是“高”优先级,另一个是“低”优先级

  • 将两个 Observable 合并为一个,然后可以对其进行订阅,目的是让这个生成的 Observable 始终先于任何低优先级的项目发出高优先级的项目。

我知道这可以使用两个 ConcurrentQueue 集合和类似的东西更简单地实现;

return this.highPriorityItems.TryDequeue(out item) 
    || this.lowPriorityItems.TryDequeue(out item);

但是这种方法有一些问题,比如不能像 Observable 那样“可订阅”(所以一旦队列用完,处理就会结束,没有很多额外的废话来把它推到任务中)。

此外,我对在队列上应用一些额外的过滤很感兴趣,例如节流和“在更改之前不同”,因此 Rx 似乎很适合这里。

最佳答案

你说的当然是优先队列。

Rx 是关于事件的,而不是队列。当然,队列 在 Rx 中被大量使用 - 但它们不是一流的概念,更多是 Rx 概念的实现细节的一部分。

我们需要队列的一个很好的例子是处理一个缓慢的观察者。事件在 Rx 中按顺序调度,如果事件到达的速度快于观察者可以处理的速度,那么它们必须针对该观察者排队。如果有很多观察者,则必须维护多个逻辑队列,因为观察者可能以不同的速度前进 - 并且 Rx 选择不让它们保持同步。

“背压”是观察者向可观察对象提供反馈的概念,以便允许机制处理更快可观察对象的压力——例如合并或节流。 Rx 没有引入背压的一流方法 - 唯一内置的方法是通过 OnNext 的同步特性来监视观察者。任何其他机制都需要带外。您的问题与背压直接相关,因为它仅在观察者速度慢的情况下才相关。

我提到所有这些是为了证明我的说法,即 Rx 不是提供您正在寻找的那种优先级调度的好选择 - 实际上,一流的排队机制似乎更合适。

要解决手头的问题,您需要在自定义运算符中自行管理优先级排队。重述这个问题:你的意思是,如果事件在观察者处理 OnNext 事件期间到达,那么就会有大量事件要分派(dispatch),而不是典型的 FIFO 队列Rx 使用的那个,你想根据一些优先级进行调度。

需要注意的是,本着 Rx 如何不让多个观察者保持同步的精神,并发观察者可能会以不同的顺序看到事件,这对您来说可能是也可能不是问题。您可以使用 Publish 之类的机制来获得顺序一致性 - 但您可能不想这样做,因为在这种情况下事件传递的时间会变得非常不可预测且效率低下。

我确信有更好的方法可以做到这一点,但这里有一个基于优先级队列的交付示例 - 您可以使用更好的方法将其扩展为适用于多个流和优先级(甚至每个事件的优先级)队列实现(例如基于 b 树的优先级队列),但我选择保持它相当简单。即便如此,请注意代码必须解决的大量问题,围绕错误处理、完成等 - 我已经选择了何时发出这些信号,肯定还有很多其他有效的选择。

总而言之,这个实现肯定会让放弃为此使用 Rx 的想法。它足够复杂,无论如何这里都可能存在错误。正如我所说,可能会有更简洁的代码(特别是考虑到我已经付出了最少的努力!),但概念上,无论实现如何,我都对这个想法感到不舒服:

public static class ObservableExtensions
{
    public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
        this IObservable<TSource> source,
        IObservable<TSource> lowPriority,
        IScheduler scheduler = null)
    {    
        scheduler = scheduler ?? Scheduler.Default;
        return Observable.Create<TSource>(o => {    
            // BufferBlock from TPL dataflow is used as it is
            // handily awaitable. package: Microsoft.Tpl.Dataflow        
            var loQueue = new BufferBlock<TSource>();
            var hiQueue = new BufferBlock<TSource>();
            var errorQueue = new BufferBlock<Exception>();
            var done = new TaskCompletionSource<int>();
            int doneCount = 0;
            Action incDone = () => {
                var dc = Interlocked.Increment(ref doneCount);
                if(dc == 2)
                    done.SetResult(0);
            };
            source.Subscribe(
                x => hiQueue.Post(x),
                e => errorQueue.Post(e),
                incDone);
            lowPriority.Subscribe(
                x => loQueue.Post(x),
                e => errorQueue.Post(e),
                incDone);
            return scheduler.ScheduleAsync(async(ctrl, ct) => {
                while(!ct.IsCancellationRequested)
                {
                    TSource nextItem;
                    if(hiQueue.TryReceive(out nextItem)
                      || loQueue.TryReceive(out nextItem))
                        o.OnNext(nextItem);

                    else if(done.Task.IsCompleted)
                    {
                        o.OnCompleted();
                        return;
                    }

                    Exception error;                        
                    if(errorQueue.TryReceive(out error))
                    {
                        o.OnError(error);
                        return;
                    }

                    var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);    
                    var loAvailableAsync = loQueue.OutputAvailableAsync(ct);                    
                    var errAvailableAsync =
                        errorQueue.OutputAvailableAsync(ct);
                    await Task.WhenAny(
                        hiAvailableAsync,
                        loAvailableAsync,
                        errAvailableAsync,
                        done.Task);
                }
            });
        });
    }
}

和示例用法:

void static Main()
{
    var xs = Observable.Range(0, 3);
    var ys = Observable.Range(10, 3);

    var source = ys.MergeWithLowPriorityStream(xs);

    source.Subscribe(Console.WriteLine, () => Console.WriteLine("Done"));
}

这将首先打印出 ys 的元素,表明它们的优先级更高。

关于c# - 将两个 Observable 与一个优先级更高的 Observable 合并,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29557484/

有关c# - 将两个 Observable 与一个优先级更高的 Observable 合并的更多相关文章

  1. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  2. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  3. ruby-on-rails - 渲染另一个 Controller 的 View - 2

    我想要做的是有2个不同的Controller,client和test_client。客户端Controller已经构建,我想创建一个test_clientController,我可以使用它来玩弄客户端的UI并根据需要进行调整。我主要是想绕过我在客户端中内置的验证及其对加载数据的管理Controller的依赖。所以我希望test_clientController加载示例数据集,然后呈现客户端Controller的索引View,以便我可以调整客户端UI。就是这样。我在test_clients索引方法中试过这个:classTestClientdefindexrender:template=>

  4. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  5. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

    如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

  6. ruby - 为什么 SecureRandom.uuid 创建一个唯一的字符串? - 2

    关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion为什么SecureRandom.uuid创建一个唯一的字符串?SecureRandom.uuid#=>"35cb4e30-54e1-49f9-b5ce-4134799eb2c0"SecureRandom.uuid方法创建的字符串从不重复?

  7. ruby - 如果指定键的值在数组中相同,如何合并哈希 - 2

    我有一个这样的哈希数组:[{:foo=>2,:date=>Sat,01Sep2014},{:foo2=>2,:date=>Sat,02Sep2014},{:foo3=>3,:date=>Sat,01Sep2014},{:foo4=>4,:date=>Sat,03Sep2014},{:foo5=>5,:date=>Sat,02Sep2014}]如果:date相同,我想合并哈希值。我对上面数组的期望是:[{:foo=>2,:foo3=>3,:date=>Sat,01Sep2014},{:foo2=>2,:foo5=>5:date=>Sat,02Sep2014},{:foo4=>4,:dat

  8. ruby - 当使用::指定模块时,为什么 Ruby 不在更高范围内查找类? - 2

    我刚刚被困在这个问题上一段时间了。以这个基地为例:moduleTopclassTestendmoduleFooendend稍后,我可以通过这样做在Foo中定义扩展Test的类:moduleTopmoduleFooclassSomeTest但是,如果我尝试通过使用::指定模块来最小化缩进:moduleTop::FooclassFailure这失败了:NameError:uninitializedconstantTop::Foo::Test这是一个错误,还是仅仅是Ruby解析变量名的方式的逻辑结果? 最佳答案 Isthisabug,or

  9. ruby-on-rails - Rails - 从另一个模型中创建一个模型的实例 - 2

    我有一个正在构建的应用程序,我需要一个模型来创建另一个模型的实例。我希望每辆车都有4个轮胎。汽车模型classCar轮胎模型classTire但是,在make_tires内部有一个错误,如果我为Tire尝试它,则没有用于创建或新建的activerecord方法。当我检查轮胎时,它没有这些方法。我该如何补救?错误是这样的:未定义的方法'create'forActiveRecord::AttributeMethods::Serialization::Tire::Module我测试了两个环境:测试和开发,它们都因相同的错误而失败。 最佳答案

  10. c# - 如何在 ruby​​ 中调用 C# dll? - 2

    如何在ruby​​中调用C#dll? 最佳答案 我能想到几种可能性:为您的DLL编写(或找人编写)一个COM包装器,如果它还没有,则使用Ruby的WIN32OLE库来调用它;看看RubyCLR,其中一位作者是JohnLam,他继续在Microsoft从事IronRuby方面的工作。(估计不会再维护了,可能不支持.Net2.0以上的版本);正如其他地方已经提到的,看看使用IronRuby,如果这是您的技术选择。有一个主题是here.请注意,最后一篇文章实际上来自JohnLam(看起来像是2009年3月),他似乎很自在地断言RubyCL

随机推荐