我有一个 rxjs 观察器(实际上是一个 Subject),它永远跟踪一个文件,就像 tail -f 一样。例如,它非常适合监控日志文件。
这种“永远”的行为对我的应用程序来说很好,但对测试来说却很糟糕。目前我的应用程序可以运行,但我的测试永远挂起。
我想强制观察者更改提前完成,因为我的测试代码知道文件中应该有多少行。我该怎么做?
我尝试在我返回的 Subject 句柄上调用 onCompleted 但此时它基本上被转换为观察者并且你不能强制它关闭,错误是:
Object # has no method 'onCompleted'
这是源代码:
function ObserveTail(filename) {
source = new Rx.Subject();
if (fs.existsSync(filename) == false) {
console.error("file doesn't exist: " + filename);
}
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
tail.on("line", function(line) {
source.onNext(line);
});
tail.on('close', function(data) {
console.log("tail closed");
source.onCompleted();
});
tail.on('error', function(error) {
console.error(error);
});
this.source = source;
}
下面是无法弄清楚如何强制永远结束的测试代码(磁带样式测试)。注意“非法”行:
test('tailing a file works correctly', function(tid) {
var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);
handle.source
.filter(function (x) {
try {
JSON.parse(x);
return true;
} catch (error) {
tid.pass("correctly caught illegal JSON");
return false;
}
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
i++;
if (i >= lines) {
handle.onCompleted(); // XXX ILLEGAL
}
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
})
最佳答案
听起来你解决了你的问题,但对于你原来的问题
I'd like to force an observer change to complete early, because my test code knows how many lines should be in the file. How do I do this?
一般来说,当您有更好的选择时,不鼓励使用 Subject,因为它们往往是人们使用他们熟悉的编程风格的拐杖。与其尝试使用 Subject,我建议您考虑每个事件在 Observable 生命周期中的含义。
EventEmitter#on/off 模式的包装器已经存在,其形式为 Observable.fromEvent。它处理清理并仅在有监听器时保持订阅有效。因此 ObserveTail 可以重构为
function ObserveTail(filename) {
return Rx.Observable.create(function(observer) {
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
var line = Rx.Observable.fromEvent(tail, "line");
var close = Rx.Observable.fromEvent(tail, "close");
var error = Rx.Observable.fromEvent(tail, "error")
.flatMap(function(err) { return Rx.Observable.throw(err); });
//Only take events until close occurs and wrap in the error for good measure
//The latter two are terminal events in this case.
return line.takeUntil(close).merge(error).subscribe(observer);
});
}
与普通使用 Subjects 相比,它有几个好处,第一,您现在可以在下游实际看到错误,第二,这将在您完成事件后处理事件清理。
然后这可以在不使用 readSync
//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
.filter(function(exists) { return exists; })
.flatMap(ObserveTail(filename));
接下来,您可以使用 flatMap 来简化过滤器/ map / map 序列。
var result = source.flatMap(function(x) {
try {
return Rx.Observable.just(JSON.parse(x));
} catch (e) {
return Rx.Observable.empty();
}
},
//This allows you to map the result of the parsed value
function(x, json) {
return json.name;
})
.timeout(10000, "observer timed out");
当流仅沿一个方向传播时,如何停止“发出停止信号”。我们实际上很少想让观察者直接与 Observable 通信,因此更好的模式是不实际“发出”停止信号,而是简单地取消订阅 Observable 并将其留给 Observable 的行为从那里确定它应该做什么。
本质上,您的 Observer 真的不应该关心您的 Observable 而不仅仅是说“我在这里完成了”。
为此,您需要声明停止时要达到的条件。
在这种情况下,由于您只是在测试用例中的一组数字后停止,您可以使用 take 取消订阅。因此最终的订阅 block 看起来像:
result
//After lines is reached this will complete.
.take(lines)
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
编辑 1
正如评论中所指出的,在这个特定的 api 的情况下,没有真正的“关闭”事件,因为 Tail 本质上是一个无限操作。从这个意义上说,它与鼠标事件处理程序没有什么不同,当人们停止监听时,我们将停止发送事件。所以你的 block 可能最终看起来像:
function ObserveTail(filename) {
return Rx.Observable.create(function(observer) {
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
var line = Rx.Observable.fromEvent(tail, "line");
var error = Rx.Observable.fromEvent(tail, "error")
.flatMap(function(err) { return Rx.Observable.throw(err); });
//Only take events until close occurs and wrap in the error for good measure
//The latter two are terminal events in this case.
return line
.finally(function() { tail.unwatch(); })
.merge(error).subscribe(observer);
}).share();
}
finally 和 share 运算符的添加创建了一个对象,该对象将在新订阅者到达时附加到尾部,并且只要有至少有一个订阅者还在听。然而,一旦所有订阅者都完成了,我们就可以安全地unwatch尾部了。
关于javascript - 强制完成 rxjs 观察者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35051883/
在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.
我遇到了一个非常奇怪的问题,我很难解决。在我看来,我有一个与data-remote="true"和data-method="delete"的链接。当我单击该链接时,我可以看到对我的Rails服务器的DELETE请求。返回的JS代码会更改此链接的属性,其中包括href和data-method。再次单击此链接后,我的服务器收到了对新href的请求,但使用的是旧的data-method,即使我已将其从DELETE到POST(它仍然发送一个DELETE请求)。但是,如果我刷新页面,HTML与"new"HTML相同(随返回的JS发生变化),但它实际上发送了正确的请求类型。这就是这个问题令我困惑的
一边学习thisRailscast我从Rack中看到了以下源代码:defself.middleware@middleware||=beginm=Hash.new{|h,k|h[k]=[]}m["deployment"].concat[[Rack::ContentLength],[Rack::Chunked],logging_middleware]m["development"].concatm["deployment"]+[[Rack::ShowExceptions],[Rack::Lint]]mendend我的问题是关于第三行。什么是传递block{|h,k|h[k]=[]}到Has
我要下载http://foobar.com/song.mp3作为song.mp3,而不是让Chrome在其native中打开它浏览器中的播放器。我怎样才能做到这一点? 最佳答案 您只需要确保发送这些header:Content-Disposition:attachment;filename=song.mp3;Content-Type:application/octet-streamContent-Transfer-Encoding:binarysend_file方法为您完成:get'/:file'do|file|file=File.
我有这个:AccountSummary我想单击该链接,但在使用link_to时出现错误。我试过:bot.click(page.link_with(:href=>/menu_home/))bot.click(page.link_with(:class=>'top_level_active'))bot.click(page.link_with(:href=>/AccountSummary/))我得到的错误是:NoMethodError:nil:NilClass的未定义方法“[]” 最佳答案 那是一个javascript链接。Mechan
我遇到了同样的问题here对于python,但对于ruby。我需要输出这样一个小数字:0.00001,而不是1e-5。有关我的特定问题的更多信息,我正在使用f.write("Mynumber:"+small_number.to_s+"\n")输出到一个文件对于我的问题,准确性不是什么大问题,所以只做一个if语句来检查是否small_number那么更通用的方法是什么? 最佳答案 f.printf"Mynumber:%.5f\n",small_number您可以将.5(小数点右侧5位数字)替换为您喜欢的任何特定格式大小,例如,%8
我不确定如何为我的搜索功能添加自动完成表单。"get"do%>nil%>我有一个具有自定义操作的Controllerdefquery@users=Search.user(params[:query])@article=Search.article(params[:query])end模型如下:defself.user(search)ifsearchUser.find(:all,:conditions=>['first_nameLIKE?',"%#{search}%"])elseUser.find(:all)endenddefself.article(search)ifsearchArt
我正在使用遗留数据库并需要创建一些CRUD。我如何使用scaffold生成器并告诉他表的确切名称以避免复数化过程?表格也是西类牙语。 最佳答案 您可以只使用ActiveRecord::Base.table_name=方法手动设置表名。因此,在您的模型中您可以:classOrderDetail 关于ruby-on-rails-如何在Rails脚手架生成器上强制使用单数表名?,我们在StackOverflow上找到一个类似的问题: https://stackove
假设我安装了三个gem:package-0.4.0、package-0.5.0和package-0.5.0-jbfink(我构建了-jbfink一个,因为我对0.5做了非常小的改动.0的来源,并希望将其与官方版本区分开来)。是否有gem(或其他命令)将其设为默认值?现在我已经安装了所有三个,但我的shell正在从package-0.5.0中获取可执行文件,我宁愿它默认为0.5.0-jbfink。将0.5.0-jbfink命名为0.5.1解决了这个问题,但我不想这样做,因为我不想与正式发布的0.5.1出现冲突。 最佳答案 转到conf
我看到有关未找到文件min.map的错误消息:GETjQuery'sjquery-1.10.2.min.mapistriggeringa404(NotFound)截图这是从哪里来的? 最佳答案 如果ChromeDevTools报告.map文件的404(可能是jquery-1.10.2.min.map、jquery.min.map或jquery-2.0.3.min.map,但任何事情都可能发生)首先要知道的是,这仅在使用DevTools时才会请求。您的用户不会遇到此404。现在您可以修复此问题或禁用sourcemap功能。修复:获取文