草庐IT

javascript - 强制完成 rxjs 观察者

coder 2025-04-02 原文

我有一个 rxjs 观察器(实际上是一个 Subject),它永远跟踪一个文件,就像 tail -f 一样。例如,它非常适合监控日志文件。

这种“永远”的行为对我的应用程序来说很好,但对测试来说却很糟糕。目前我的应用程序可以运行,但我的测试永远挂起。

我想强制观察者更改提前完成,因为我的测试代码知道文件中应该有多少行。我该怎么做?

我尝试在我返回的 Subject 句柄上调用 onCompleted 但此时它基本上被转换为观察者并且你不能强制它关闭,错误是:

Object # has no method 'onCompleted'

这是源代码:

function ObserveTail(filename) {

source = new Rx.Subject();

if (fs.existsSync(filename) == false) {
    console.error("file doesn't exist: " + filename);
}

var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);

tail.on("line", function(line) {
        source.onNext(line);
});
tail.on('close', function(data) {
    console.log("tail closed");
    source.onCompleted();
});     
tail.on('error', function(error) {
    console.error(error);
});     

this.source = source;
}           

下面是无法弄清楚如何强制永远结束的测试代码(磁带样式测试)。注意“非法”行:

test('tailing a file works correctly', function(tid) {

var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);

handle.source
.filter(function (x) {
    try {
        JSON.parse(x);
        return true;
    } catch (error) {
        tid.pass("correctly caught illegal JSON");
        return false;
    }
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
        i++;
        if (i >= lines) {
            handle.onCompleted();   // XXX ILLEGAL
        }
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

})

最佳答案

听起来你解决了你的问题,但对于你原来的问题

I'd like to force an observer change to complete early, because my test code knows how many lines should be in the file. How do I do this?

一般来说,当您有更好的选择时,不鼓励使用 Subject,因为它们往往是人们使用他们熟悉的编程风格的拐杖。与其尝试使用 Subject,我建议您考虑每个事件在 Observable 生命周期中的含义。

包装事件发射器

EventEmitter#on/off 模式的包装器已经存在,其形式为 Observable.fromEvent。它处理清理并仅在有监听器时保持订阅有效。因此 ObserveTail 可以重构为

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var close = Rx.Observable.fromEvent(tail, "close");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line.takeUntil(close).merge(error).subscribe(observer);
  });
} 

与普通使用 Subjects 相比,它有几个好处,第一,您现在可以在下游实际看到错误,第二,这将在您完成事件后处理事件清理。

避免*同步方法

然后这可以在不使用 readSync

的情况下滚动到您的文件存在检查中
//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
    .filter(function(exists) { return exists; })
    .flatMap(ObserveTail(filename));

接下来,您可以使用 flatMap 来简化过滤器/ map / map 序列。

var result = source.flatMap(function(x) {
  try {
    return Rx.Observable.just(JSON.parse(x));
  } catch (e) {
    return Rx.Observable.empty();
  }
}, 
//This allows you to map the result of the parsed value
function(x, json) {
  return json.name;
})
.timeout(10000, "observer timed out");

不要发出信号,取消订阅

当流仅沿一个方向传播时,如何停止“发出停止信号”。我们实际上很少想让观察者直接与 Observable 通信,因此更好的模式是不实际“发出”停止信号,而是简单地取消订阅 Observable 并将其留给 Observable 的行为从那里确定它应该做什么。

本质上,您的 Observer 真的不应该关心您的 Observable 而不仅仅是说“我在这里完成了”。

为此,您需要声明停止时要达到的条件。

在这种情况下,由于您只是在测试用例中的一组数字后停止,您可以使用 take 取消订阅。因此最终的订阅 block 看起来像:

result
 //After lines is reached this will complete.
 .take(lines)
 .subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

编辑 1

正如评论中所指出的,在这个特定的 api 的情况下,没有真正的“关闭”事件,因为 Tail 本质上是一个无限操作。从这个意义上说,它与鼠标事件处理程序没有什么不同,当人们停止监听时,我们将停止发送事件。所以你的 block 可能最终看起来像:

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line
            .finally(function() {  tail.unwatch(); })
            .merge(error).subscribe(observer);
  }).share();
} 

finallyshare 运算符的添加创建了一个对象,该对象将在新订阅者到达时附加到尾部,并且只要有至少有一个订阅者还在听。然而,一旦所有订阅者都完成了,我们就可以安全地unwatch尾部了。

关于javascript - 强制完成 rxjs 观察者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35051883/

有关javascript - 强制完成 rxjs 观察者的更多相关文章

  1. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

  2. ruby-on-rails - 使用 javascript 更改数据方法不会更改 ajax 调用用户的什么方法? - 2

    我遇到了一个非常奇怪的问题,我很难解决。在我看来,我有一个与data-remote="true"和data-method="delete"的链接。当我单击该链接时,我可以看到对我的Rails服务器的DELETE请求。返回的JS代码会更改此链接的属性,其中包括href和data-method。再次单击此链接后,我的服务器收到了对新href的请求,但使用的是旧的data-method,即使我已将其从DELETE到POST(它仍然发送一个DELETE请求)。但是,如果我刷新页面,HTML与"new"HTML相同(随返回的JS发生变化),但它实际上发送了正确的请求类型。这就是这个问题令我困惑的

  3. Ruby:行 "m = Hash.new {|h,k| h[k] = []}"完成了什么而 "Hash.new"没有完成? - 2

    一边学习thisRailscast我从Rack中看到了以下源代码:defself.middleware@middleware||=beginm=Hash.new{|h,k|h[k]=[]}m["deployment"].concat[[Rack::ContentLength],[Rack::Chunked],logging_middleware]m["development"].concatm["deployment"]+[[Rack::ShowExceptions],[Rack::Lint]]mendend我的问题是关于第三行。什么是传递block{|h,k|h[k]=[]}到Has

  4. ruby - 强制浏览器下载文件而不是打开文件 - 2

    我要下载http://foobar.com/song.mp3作为song.mp3,而不是让Chrome在其native中打开它浏览器中的播放器。我怎样才能做到这一点? 最佳答案 您只需要确保发送这些header:Content-Disposition:attachment;filename=song.mp3;Content-Type:application/octet-streamContent-Transfer-Encoding:binarysend_file方法为您完成:get'/:file'do|file|file=File.

  5. ruby - 在 Mechanize 中使用 JavaScript 单击链接 - 2

    我有这个:AccountSummary我想单击该链接,但在使用link_to时出现错误。我试过:bot.click(page.link_with(:href=>/menu_home/))bot.click(page.link_with(:class=>'top_level_active'))bot.click(page.link_with(:href=>/AccountSummary/))我得到的错误是:NoMethodError:nil:NilClass的未定义方法“[]” 最佳答案 那是一个javascript链接。Mechan

  6. ruby - 强制 Ruby 不以标准形式/科学记数法/指数记数法输出 float - 2

    我遇到了同样的问题here对于python,但对于ruby​​。我需要输出这样一个小数字:0.00001,而不是1e-5。有关我的特定问题的更多信息,我正在使用f.write("Mynumber:"+small_number.to_s+"\n")输出到一个文件对于我的问题,准确性不是什么大问题,所以只做一个if语句来检查是否small_number那么更通用的方法是什么? 最佳答案 f.printf"Mynumber:%.5f\n",small_number您可以将.5(小数点右侧5位数字)替换为您喜欢的任何特定格式大小,例如,%8

  7. ruby-on-rails - 自动完成搜索的 Rails 实现 - 2

    我不确定如何为我的搜索功能添加自动完成表单。"get"do%>nil%>我有一个具有自定义操作的Controllerdefquery@users=Search.user(params[:query])@article=Search.article(params[:query])end模型如下:defself.user(search)ifsearchUser.find(:all,:conditions=>['first_nameLIKE?',"%#{search}%"])elseUser.find(:all)endenddefself.article(search)ifsearchArt

  8. ruby-on-rails - 如何在 Rails 脚手架生成器上强制使用单数表名? - 2

    我正在使用遗留数据库并需要创建一些CRUD。我如何使用scaffold生成器并告诉他表的确切名称以避免复数化过程?表格也是西类牙语。 最佳答案 您可以只使用ActiveRecord::Base.table_name=方法手动设置表名。因此,在您的模型中您可以:classOrderDetail 关于ruby-on-rails-如何在Rails脚手架生成器上强制使用单数表名?,我们在StackOverflow上找到一个类似的问题: https://stackove

  9. ruby - 强制使用特定的 gem 版本作为默认版本? - 2

    假设我安装了三个gem:package-0.4.0、package-0.5.0和package-0.5.0-jbfink(我构建了-jbfink一个,因为我对0.5做了非常小的改动.0的来源,并希望将其与官方版本区分开来)。是否有gem(或其他命令)将其设为默认值?现在我已经安装了所有三个,但我的shell正在从package-0.5.0中获取可执行文件,我宁愿它默认为0.5.0-jbfink。将0.5.0-jbfink命名为0.5.1解决了这个问题,但我不想这样做,因为我不想与正式发布的0.5.1出现冲突。 最佳答案 转到conf

  10. javascript - jQuery 的 jquery-1.10.2.min.map 正在触发 404(未找到) - 2

    我看到有关未找到文件min.map的错误消息:GETjQuery'sjquery-1.10.2.min.mapistriggeringa404(NotFound)截图这是从哪里来的? 最佳答案 如果ChromeDevTools报告.map文件的404(可能是jquery-1.10.2.min.map、jquery.min.map或jquery-2.0.3.min.map,但任何事情都可能发生)首先要知道的是,这仅在使用DevTools时才会请求。您的用户不会遇到此404。现在您可以修复此问题或禁用sourcemap功能。修复:获取文

随机推荐