我们正在为 Apache Beam 管道构建集成测试,但遇到了一些问题。有关上下文,请参见下文...
关于我们管道的详细信息:
PubsubIO作为我们的数据源(无界PCollection)CombineFn 和非常简单的窗口/触发策略JdbcIO,使用org.neo4j.jdbc.Driver写入Neo4j目前的测试方法:
OurPipeline.main(TestPipeline.convertToArgs(options)PubsubIO 将从中读取这是一个简单的集成测试,它将验证我们的管道作为一个整体是否按预期运行。
我们目前遇到的问题是,当我们运行我们的管道时,它会阻塞。我们正在使用 DirectRunner 和 pipeline.run()(不是 pipeline.run().waitUntilFinish()) ,但测试似乎在运行管道后挂起。因为这是一个无界的 PCollection(以流模式运行),管道不会终止,因此不会到达它之后的任何代码。
所以,我有几个问题:
1) 有没有办法运行一个管道,然后手动停止它?
2) 有没有办法异步运行管道?理想情况下,它会启动管道(然后会持续轮询 Pub/Sub 以获取数据),然后继续处理负责发布到 Pub/Sub 的代码。
3) 这种集成测试管道的方法是否合理,或者是否有更好的方法可能更直接?此处提供任何信息/指导,我们将不胜感激。
如果我可以提供任何额外的代码/上下文,请告诉我 - 谢谢!
最佳答案
通过将 isBlockOnRun 管道选项设置为 false,您可以使用 DirectRunner 异步运行管道。只要您保持对返回的 PipelineResult 的引用可用,对该结果调用 cancel() 应该会停止管道。
对于你的第三个问题,你的设置似乎是合理的。但是,如果您想要对管道进行较小规模的测试(需要较少的组件),您可以将所有处理逻辑封装在自定义 PTransform 中。此 PTransform 应从输入源获取已完全解析的输入,并生成尚未为输出接收器解析的输出。
完成后,您可以使用 Create(通常不会执行触发)或 TestStream(这可能取决于您构建 的方式TestStream) 与 DirectRunner 生成有限数量的输入数据,将此处理 PTransform 应用于该 PCollection,并使用 PAsert 输出 PCollection 以验证管道是否生成了您期望的输出。
有关测试的更多信息,Beam 网站在 Programming Guide 中提供了有关这些测试类型的信息和一个 blog post关于使用 TestStream 测试管道。
关于java - Apache Beam - 与无限 PCollection 的集成测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44727839/
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
我脑子里浮现出一些关于一种新编程语言的想法,所以我想我会尝试实现它。一位friend建议我尝试使用Treetop(Rubygem)来创建一个解析器。Treetop的文档很少,我以前从未做过这种事情。我的解析器表现得好像有一个无限循环,但没有堆栈跟踪;事实证明很难追踪到。有人可以指出入门级解析/AST指南的方向吗?我真的需要一些列出规则、常见用法等的东西来使用像Treetop这样的工具。我的语法分析器在GitHub上,以防有人希望帮助我改进它。class{initialize=lambda(name){receiver.name=name}greet=lambda{IO.puts("He
我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当
我有一个围绕一些对象的包装类,我想将这些对象用作散列中的键。包装对象和解包装对象应映射到相同的键。一个简单的例子是这样的:classAattr_reader:xdefinitialize(inner)@inner=innerenddefx;@inner.x;enddef==(other)@inner.x==other.xendenda=A.new(o)#oisjustanyobjectthatallowso.xb=A.new(o)h={a=>5}ph[a]#5ph[b]#nil,shouldbe5ph[o]#nil,shouldbe5我试过==、===、eq?并散列所有无济于事。
我有一些Ruby代码,如下所示:Something.createdo|x|x.foo=barend我想编写一个测试,它使用double代替block参数x,这样我就可以调用:x_double.should_receive(:foo).with("whatever").这可能吗? 最佳答案 specify'something'dox=doublex.should_receive(:foo=).with("whatever")Something.should_receive(:create).and_yield(x)#callthere
Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我遵循MichaelHartl的“RubyonRails教程:学习Web开发”,并创建了检查用户名和电子邮件长度有效性的测试(名称最多50个字符,电子邮件最多255个字符)。test/helpers/application_helper_test.rb的内容是:require'test_helper'classApplicationHelperTest在运行bundleexecraketest时,所有测试都通过了,但我看到以下消息在最后被标记为错误:ERROR["test_full_title_helper",ApplicationHelperTest,1.820016791]test
我已经构建了一些serverspec代码来在多个主机上运行一组测试。问题是当任何测试失败时,测试会在当前主机停止。即使测试失败,我也希望它继续在所有主机上运行。Rakefile:namespace:specdotask:all=>hosts.map{|h|'spec:'+h.split('.')[0]}hosts.eachdo|host|begindesc"Runserverspecto#{host}"RSpec::Core::RakeTask.new(host)do|t|ENV['TARGET_HOST']=hostt.pattern="spec/cfengine3/*_spec.r
我在app/helpers/sessions_helper.rb中有一个帮助程序文件,其中包含一个方法my_preference,它返回当前登录用户的首选项。我想在集成测试中访问该方法。例如,这样我就可以在测试中使用getuser_path(my_preference)。在其他帖子中,我读到这可以通过在测试文件中包含requiresessions_helper来实现,但我仍然收到错误NameError:undefinedlocalvariableormethod'my_preference'.我做错了什么?require'test_helper'require'sessions_hel