草庐IT

python - Luigi 可以传播异常或返回任何结果吗?

我正在使用Luigi启动一些管道。举个简单的例子task=myTask()w=Worker(scheduler=CentralPlannerScheduler(),worker_processes=1)w.add(task)w.run()现在假设myTask在执行期间引发异常。我所能拥有的只是luigi显示异常的日志。luigi有什么方法可以传播它或至少返回一个failure状态吗?然后我就可以让我的程序根据该状态使用react。谢谢。编辑当我存储结果时,我忘记指定luigi的输出以数据库为目标。如果引发异常,则不会存储任何结果,但不会将异常传播出luigi。我想知道luigi是否可以

python - 如何重置 luigi 任务状态?

目前,我有一堆luigi任务排在一起,有一个简单的依赖链(a->b->c->d)。d首先执行,a最后执行。a是被触发的任务。除了a之外的所有目标都返回一个luigi.LocalTarget()对象并有一个通用的luigi.Parameter()这是一个字符串(包含日期和时间)。在luigi中央服务器(启用了历史记录)上运行。问题是,当我重新运行所述任务a时,luigi检查历史记录并查看该特定任务之前是否运行过,如果它的状态为DONE,则不会t运行任务(在本例中为d),我不能这样做,更改字符串无济于事(向其添加了一个随机微秒)。如何强制运行任务? 最佳答案

python - 在 Luigi Orchestrator 中并行化任务

我定义了三个任务T1、T2和T3,然后定义了一个任务T4,如下:classT4(luigi.Task)defrequires(self):return[T1(),T2(),T3()]是否有一种自然的方式告诉Luigi我希望这些任务T1、T2和T3并行执行? 最佳答案 这取决于T1、T2和T3有什么依赖关系。如果他们没有其他任务作为共同依赖项,您只需运行指定--workers=3和Luigiwillruneachtaskinaseparateworker的任务即可。. 关于python-在

python - 从 S3 开始的 Luigi Pipeline

我的初始文件在AWSS3中.有人可以指出我需要如何在LuigiTask中设置它吗??我查看了文档并找到了luigi.S3但我不清楚该怎么做,然后我在网上搜索并只获得来自mortar-luigi的链接。并在luigi之上实现。更新按照为@matagus提供的示例(我也按照建议创建了~/.boto文件):#coding:utf-8importluigifromluigi.s3importS3Target,S3ClientclassMyS3File(luigi.ExternalTask):defoutput(self):returnS3Target('s3://my-bucket/19170