草庐IT

关于c#:Observable.Repeat 势不可挡,是bug还是特性?

codeneng 2023-03-28 原文

The Observable.Repeat is unstoppable, is it a bug or a feature?

本问题已经有最佳答案,请猛点这里访问。

当源 observable 的通知是同步的时,我注意到 Repeat 运算符的行为有些奇怪。生成的 observable 不能用后续的 TakeWhile 操作符停止,并且显然会永远继续运行。为了演示,我创建了一个源 observable,它产生一个值,它在每次订阅时递增。第一个订阅者获得值 1,第二个获得值 2 依此类推:

1
2
3
4
5
6
7
8
9
10
11
int incrementalValue = 0;
var incremental = Observable.Create<int>(async o =>
{
    await Task.CompletedTask;
    //await Task.Yield();

    Thread.Sleep(100);
    var value = Interlocked.Increment(ref incrementalValue);
    o.OnNext(value);
    o.OnCompleted();
});

然后我将运算符 RepeatTakeWhileLastAsync 附加到这个 observable 上,这样程序就会等到组合后的 observable 产生它的最后一个值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
incremental.Repeat()
    .Do(new CustomObserver("Checkpoint A"))
    .TakeWhile(item => item <= 5)
    .Do(new CustomObserver("Checkpoint B"))
    .LastAsync()
    .Do(new CustomObserver("Checkpoint C"))
    .Wait();
Console.WriteLine($"Done");

class CustomObserver : IObserver<int>
{
    private readonly string _name;
    public CustomObserver(string name) => _name = name;
    public void OnNext(int value) => Console.WriteLine($"{_name}: {value}");
    public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}");
    public void OnCompleted() => Console.WriteLine($"{_name}: Completed");
}

这是这个程序的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Checkpoint A: 7
Checkpoint A: 8
Checkpoint A: 9
Checkpoint A: 10
Checkpoint A: 11
Checkpoint A: 12
Checkpoint A: 13
Checkpoint A: 14
Checkpoint A: 15
Checkpoint A: 16
Checkpoint A: 17
...

它永远不会结束!虽然 LastAsync 已经产生了它的值并完成了,但 Repeat 操作符仍在旋转!

只有当源 observable 同步通知其订阅者时才会发生这种情况。例如,取消注释行 //await Task.Yield(); 后,程序的行为与预期相同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Done

Repeat 操作符停止旋转,尽管它没有报告完成(我的猜测是它已被取消订阅)。

有没有什么方法可以使 Repeat 操作符的行为保持一致,而不管它接收到的通知类型(同步还是异步)?

.NET Core 3.0、C# 8、System.Reactive 4.3.2、控制台应用程序

  • 我一直说 Observable.Create 很糟糕......
  • 好吧,它更多地与 Scheduler.Immediate 有关。将调度程序更改为 incremental.ObserveOn(Scheduler.Default).Repeat() 并查看会发生什么。
  • 我怀疑你锁定了取消订阅所需的线程。
  • @Enigmativity 它与 Observable.Create 无关。此实现也存在问题:var incremental = Observable.Defer(() => Observable.Return(++incrementalValue));
  • 你是对的。然而 Observable.Create 使得创建具有此问题的可观察对象变得太容易了。
  • @Enigmativity with ObserveOn(Scheduler.Default) 通知是从各种线程接收的,本质上是模仿我在生产者内部使用 await Task.Yield() 的测试。但是我检查了另一个选项: ObserveOn(Scheduler.CurrentThread) 似乎可以彻底解决问题!使用此选项,程序按预期工作,并且一切都发生在一个线程中。 ObserveOn(Scheduler.CurrentThread) 有什么我应该注意的缺点吗?
  • Repeat之前用SubscribeOn(Scheduler.CurrentThread)也解决了这个问题,天知道为什么!
  • 从内存 Scheduler.CurrentThread 使用蹦床在当前线程(即当前上下文)上进行调度,因此它不会立即运行并取消阻塞线程。否则,使用 Scheduler.Immediate 您将面临死锁的风险。
  • @Enigmativity 是我的猜测,但是控制台应用程序没有同步上下文,因此令人费解。
  • 您应该查看 System.Reactive.Concurrency 命名空间的源代码。那会让你的眼睛流泪。他们在那里做了很多技巧来使 Rx 工作。那里有一个同步上下文。


您可能期望 Repeat 的实现以 OnCompleted 通知为特色,但事实证明它是根据 Concat 实现的 - 无限流。

1
2
3
4
5
6
7
8
9
10
    public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source) =>
        RepeatInfinite(source).Concat();

    private static IEnumerable< T > RepeatInfinite< T >(T value)
    {
        while (true)
        {
            yield return value;
        }
    }

将责任转移到 Concat - 我们可以创建一个简化版本(血淋淋的实现细节在 TailRecursiveSink.cs 中)。除非 await Task.Yield().

提供了不同的执行上下文,否则它仍然会继续旋转

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static IObservable< T > ConcatEx< T >(this IEnumerable<IObservable< T >> enumerable) =>
    Observable.Create< T >(observer =>
    {
        var check = new BooleanDisposable();

        IDisposable loopRec(IScheduler inner, IEnumerator<IObservable< T >> enumerator)
        {
            if (check.IsDisposed)
                return Disposable.Empty;

            if (enumerator.MoveNext()) //this never returns false
                return enumerator.Current.Subscribe(
                    observer.OnNext,
                    () => inner.Schedule(enumerator, loopRec) //<-- starts next immediately
                );
            else
                return inner.Schedule(observer.OnCompleted); //this never runs
        }

        Scheduler.Immediate.Schedule(enumerable.GetEnumerator(), loopRec); //this runs forever
        return check;
    });

作为一个无限流,enumerator.MoveNext() 总是返回 true,所以另一个分支永远不会运行 - 这是预期的;这不是我们的问题。

o.OnCompleted() 被调用时,它立即调度下一个迭代循环
Schedule(enumerator, loopRec) 同步调用下一个 o.OnCompleted(),并且它无限地继续 - 没有一点可以逃脱这个递归。

如果你有一个带有 await Task.Yield() 的上下文切换,那么 Schedule(enumerator, loopRec) 会立即退出,并且 o.OnCompleted() 会被非同步调用。

RepeatConcat 使用当前线程在不改变上下文的情况下工作 - 这不是不正确的行为,但是当同样的上下文也用于推送通知时,它可能导致死锁或被陷入
永久蹦床。

带注释的调用栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[External Code]
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code]
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code]
ConcatEx.AnonymousMethod__2() //inner.Schedule(enumerator, loopRec)
[External Code]
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code]
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code]
ConcatEx.AnonymousMethod__0(observer) //Scheduler.Immediate.Schedule(...)
[External Code]
Main(args) //incremental.RepeatEx()...

  • 谢谢阿斯蒂。我不能指望更彻底的答案!
  • @TheodorZoulias 欢迎您!我利用挖掘源代码的经验来解决另一个问题。
  • 抱歉,我之前没有尝试过!在我看来,评论中得出了一些结论。
  • 是的,我们找到了使用 Scheduler.CurrentThread 的解决方法,但是导致默认行为的原因仍然是个谜。 ??
  • 啊,在 ConcatEx 实现中将 `Scheduler.Immediate` 更改为 Scheduler.CurrentThread 解决了这个问题。
  • 但是在 Scheduler.CurrentThread 上运行 Repeat 可能会导致许多其他问题。我的意思是一般。
  • 是的,关于 ToObservable().ToEnumerable().ToObservable() 问题的另一个问题非常有启发性。如果你推得太多,RX 抽象就会开始泄漏!
  • 同意。大多数时候 Rx 是"不要担心并发",但是当你把任何东西推到极限时,泄漏抽象的法则就会迎头赶上。
  • 如果这个谜团解开了,你能把它标记为已回答吗?

有关关于c#:Observable.Repeat 势不可挡,是bug还是特性?的更多相关文章

  1. c# - 如何在 ruby​​ 中调用 C# dll? - 2

    如何在ruby​​中调用C#dll? 最佳答案 我能想到几种可能性:为您的DLL编写(或找人编写)一个COM包装器,如果它还没有,则使用Ruby的WIN32OLE库来调用它;看看RubyCLR,其中一位作者是JohnLam,他继续在Microsoft从事IronRuby方面的工作。(估计不会再维护了,可能不支持.Net2.0以上的版本);正如其他地方已经提到的,看看使用IronRuby,如果这是您的技术选择。有一个主题是here.请注意,最后一篇文章实际上来自JohnLam(看起来像是2009年3月),他似乎很自在地断言RubyCL

  2. C# 到 Ruby sha1 base64 编码 - 2

    我正在尝试在Ruby中复制Convert.ToBase64String()行为。这是我的C#代码:varsha1=newSHA1CryptoServiceProvider();varpasswordBytes=Encoding.UTF8.GetBytes("password");varpasswordHash=sha1.ComputeHash(passwordBytes);returnConvert.ToBase64String(passwordHash);//returns"W6ph5Mm5Pz8GgiULbPgzG37mj9g="当我在Ruby中尝试同样的事情时,我得到了相同sha

  3. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  4. ruby-on-rails - Ruby 流量控制 : throw an exception, 返回 nil 还是让它失败? - 2

    我在思考流量控制的最佳实践。我应该走哪条路?1)不要检查任何东西并让程序失败(更清晰的代码,自然的错误消息):defself.fetch(feed_id)feed=Feed.find(feed_id)feed.fetchend2)通过返回nil静默失败(但是,“CleanCode”说,你永远不应该返回null):defself.fetch(feed_id)returnunlessfeed_idfeed=Feed.find(feed_id)returnunlessfeedfeed.fetchend3)抛出异常(因为不按id查找feed是异常的):defself.fetch(feed_id

  5. ruby - 使用哪个,eruby 还是 erb? - 2

    eruby和erb有什么区别?哪些考虑因素会促使我选择其中之一?我的应用程序正在为网络设备(路由器、负载平衡器、防火墙等)生成配置文件。我的计划是对配置文件进行模板化,在源文件中使用嵌入式ruby​​(通过eruby或erb)来执行诸如迭代生成路由器的所有接口(interface)配置block之类的操作(这些block都非常相似,仅在标签上有所不同和IP地址)。例如,我可能有这样一个配置模板文件:hostnamesample-routerlogging10.5.16.26当通过嵌入式ruby​​解释器(erb或eruby)运行时,会产生以下输出:hostnamesample-rout

  6. ruby-on-rails - 关于 Ruby 的一般问题 - 2

    我在我的rails应用程序中安装了来自github.com的acts_as_versioned插件,但有一段代码我不完全理解,我希望有人能帮我解决这个问题class_eval我知道block内的方法(或任何它是什么)被定义为类内的实例方法,但我在插件的任何地方都找不到定义为常量的CLASS_METHODS,而且我也不确定是什么here,并且有问题的代码从lib/acts_as_versioned.rb的第199行开始。如果有人愿意告诉我这里的内幕,我将不胜感激。谢谢-C 最佳答案 这是一个异端。http://en.wikipedia

  7. ruby - 我怎样才能更好地了解/了解更多关于 Ruby 的知识? - 2

    按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭9年前。我最近开始学习Ruby,这是我的第一门编程语言。我对语法感到满意,并且我已经完成了许多只教授相同基础知识的教程。我已经写了一些小程序(包括我自己的数组排序方法,在有人告诉我谷歌“冒泡排序”之前我认为它非常聪明),但我觉得我需要尝试更大更难的东西来理解更多关于Ruby.关于如何执行此操作的任何想法?

  8. ruby - 关于 Ruby 中 Dir[] 和 File.join() 的混淆 - 2

    我在Ruby中遇到了一个关于Dir[]和File.join()的简单程序,blobs_dir='/path/to/dir'Dir[File.join(blobs_dir,"**","*")].eachdo|file|FileUtils.rm_rf(file)ifFile.symlink?(file)我有两个困惑:首先,File.join(@blobs_dir,"**","*")中的第二个和第三个参数是什么意思?其次,Dir[]在Ruby中有什么用?我只知道它等价于Dir.glob(),但是,我对Dir.glob()确实不是很清楚。 最佳答案

  9. c# - C# 中的 Flatten Ruby 方法 - 2

    我如何做Ruby方法"Flatten"RubyMethod在C#中。此方法将锯齿状数组展平为一维数组。例如:s=[1,2,3]#=>[1,2,3]t=[4,5,6,[7,8]]#=>[4,5,6,[7,8]]a=[s,t,9,10]#=>[[1,2,3],[4,5,6,[7,8]],9,10]a.flatten#=>[1,2,3,4,5,6,7,8,9,10 最佳答案 递归解决方案:IEnumerableFlatten(IEnumerablearray){foreach(variteminarray){if(itemisIEnume

  10. ruby - 可以像在 C# 中使用#region 一样在 Ruby 中使用 begin/end 吗? - 2

    我最近从C#转向了Ruby,我发现自己无法制作可折叠的标记代码区域。我只是想到做这种事情应该没问题:classExamplebegin#agroupofmethodsdefmethod1..enddefmethod2..endenddefmethod3..endend...但是这样做真的可以吗?method1和method2最终与method3是同一种东西吗?还是有一些我还没有见过的用于执行此操作的Ruby惯用语? 最佳答案 正如其他人所说,这不会改变方法定义。但是,如果要标记方法组,为什么不使用Ruby语义来标记它们呢?您可以使用

随机推荐