我们正在尝试运行一个分布在 docker swarm 集群上的简单管道。 luigi 工作人员被部署为复制的 docker 服务。他们成功启动,在向 luigi-server 请求工作几秒钟后,他们开始死亡,因为没有分配工作给他们,所有任务最终都分配给了一个 worker 。
我们必须在 worker 的 luigi.cfg 中设置 keep_alive=True 以强制他们不要死,但在管道完成后保留 worker 似乎是个坏主意。 有没有办法控制工作分配?
我们的测试管道:
class RunAllTasks(luigi.Task):
tasks = luigi.IntParameter()
sleep_time = luigi.IntParameter()
def requires(self):
for i in range(self.tasks):
yield RunExampleTask(i, self.sleep_time)
def run(self):
with self.output().open('w') as f:
f.write('All done!')
def output(self):
return LocalTarget('/data/RunAllTasks.txt')
class RunExampleTask(luigi.Task):
number = luigi.IntParameter()
sleep_time = luigi.IntParameter()
@property
def cmd(self):
return """
docker run --rm --name example_{number} hello-world
""".format(number=self.number)
def run(self):
time.sleep(self.sleep_time)
logger.debug(self.cmd)
out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
logger.debug(out)
with self.output().open('w') as f:
f.write(str(out))
def output(self):
return LocalTarget('/data/{number}.txt'.format(number=self.number))
if __name__ == "__main__":
luigi.run()
最佳答案
您的问题是一次 yield 单个需求的结果,而不是您想一次 yield 所有这些需求,如下所示:
def requires(self):
reqs = []
for i in range(self.tasks):
reqs.append(RunExampleTask(i, self.sleep_time))
yield reqs
关于docker - 由于 Luigi 的工作分配不均, worker 过早死亡 (2.6.1),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43851114/
我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
通过rubykoans.com,我在about_array_assignment.rb中遇到了这两段代码你怎么知道第一个是非并行赋值,第二个是一个变量的并行赋值?在我看来,除了命名差异之外,代码几乎完全相同。4deftest_non_parallel_assignment5names=["John","Smith"]6assert_equal["John","Smith"],names7end45deftest_parallel_assignment_with_one_variable46first_name,=["John","Smith"]47assert_equal'John
我花了三天的时间用头撞墙,试图弄清楚为什么简单的“rake”不能通过我的规范文件。如果您遇到这种情况:任何文件夹路径中都不要有空格!。严重地。事实上,从现在开始,您命名的任何内容都没有空格。这是我的控制台输出:(在/Users/*****/Desktop/LearningRuby/learn_ruby)$rake/Users/*******/Desktop/LearningRuby/learn_ruby/00_hello/hello_spec.rb:116:in`require':cannotloadsuchfile--hello(LoadError) 最佳
关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion在首页我有:汽车:VolvoSaabMercedesAudistatic_pages_spec.rb中的测试代码:it"shouldhavetherightselect"dovisithome_pathit{shouldhave_select('cars',:options=>['volvo','saab','mercedes','audi'])}end响应是rspec./spec/request
在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo
我早就知道Ruby中的“常量”(即大写的变量名)不是真正常量。与其他编程语言一样,对对象的引用是唯一存储在变量/常量中的东西。(侧边栏:Ruby确实具有“卡住”引用对象不被修改的功能,据我所知,许多其他语言都没有提供这种功能。)所以这是我的问题:当您将一个值重新分配给常量时,您会收到如下警告:>>FOO='bar'=>"bar">>FOO='baz'(irb):2:warning:alreadyinitializedconstantFOO=>"baz"有没有办法强制Ruby抛出异常而不是打印警告?很难弄清楚为什么有时会发生重新分配。 最佳答案
使用Ruby1.9.2运行IDE提示说需要gemruby-debug-base19x并提供安装它。但是,在尝试安装它时会显示消息Failedtoinstallgems.Followinggemswerenotinstalled:C:/ProgramFiles(x86)/JetBrains/RubyMine3.2.4/rb/gems/ruby-debug-base19x-0.11.30.pre2.gem:Errorinstallingruby-debug-base19x-0.11.30.pre2.gem:The'linecache19'nativegemrequiresinstall
1.错误信息:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:requestcanceledwhilewaitingforconnection(Client.Timeoutexceededwhileawaitingheaders)或者:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:TLShandshaketimeout2.报错原因:docker使用的镜像网址默认为国外,下载容易超时,需要修改成国内镜像地址(首先阿里
我知道全局变量$!包含最新的异常对象,但我对下面的语法感到困惑。谁能帮助我理解以下语法?rescue$! 最佳答案 此构造可防止异常停止您的程序并使堆栈跟踪冒泡。它还会将该异常作为值返回,这很有用。a=get_me_datarescue$!在此行之后,a将保存请求的数据或异常。然后您可以分析该异常并采取相应措施。defget_me_dataraise'Nodataforyou'enda=get_me_datarescue$!puts"Executioncarrieson"pa#>>Executioncarrieson#>>#更现实的