我想知道是否有一种方法可以设置 RabbitMQ 或 Redis 与 Celery 一起工作,这样当我将任务发送到队列时,它不会进入任务列表,而是进入一组任务根据我的任务的有效负载进行键控,以避免重复。
这是我的设置以获得更多上下文: python + celery 。我已经尝试过 RabbitMQ 作为后端,现在我正在使用 Redis 作为后端,因为我不需要 100% 的可靠性、更容易使用、内存占用小等。
我有大约 1000 个 ID 需要重复完成。我的数据管道的第 1 阶段由调度程序触发,它输出第 2 阶段的任务。任务仅包含需要完成工作的 ID,实际数据存储在数据库中。我可以运行任何组合或序列的第 1 阶段和第 2 阶段任务而不会造成伤害。
如果第 2 阶段没有足够的处理能力来处理第 1 阶段输出的任务量,我的任务队列就会越来越大。如果任务队列使用集合而不是列表作为基础数据结构,则不必是这种情况。
是否有现成的解决方案可以从列表切换到集合作为分布式任务队列? celery 有这个能力吗?我最近看到 Redis 刚刚发布了一个队列系统的 alpha 版本,所以它还没有准备好用于生产。
我应该以不同的方式构建我的管道吗?
最佳答案
您可以使用外部数据结构来存储和监控 celery 队列的当前状态。 1.让我们以redis键值为例。每当您将任务推送到 celery 中时,您都会在 redis 中将带有“id”字段的键标记为 true。
在尝试推送带有任何 'id' 的新任务之前,您将检查带有 'id' 的键在 Redis 中是否为真,如果是,则跳过推送任务。
要在适当的时候清除 key ,您可以使用after_return。 celery 的处理程序,在任务返回时运行。此处理程序将取消设置 redis 中的键“id”,从而清除下一个任务推送的锁。
此方法确保您在 celery 队列中运行的每个任务 ID 只有一个实例。您还可以通过在 redis 键上使用 INCR 和 DECR 命令,在推送任务和任务的 after_return 时,将其增强为每个 ID 仅允许 N 个任务。
关于python - 基于集合作为数据结构而不是列表的分布式任务队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30278054/
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我有一些Ruby代码,如下所示:Something.createdo|x|x.foo=barend我想编写一个测试,它使用double代替block参数x,这样我就可以调用:x_double.should_receive(:foo).with("whatever").这可能吗? 最佳答案 specify'something'dox=doublex.should_receive(:foo=).with("whatever")Something.should_receive(:create).and_yield(x)#callthere
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
是否有类似“RVMuse1”或“RVMuselist[0]”之类的内容而不是键入整个版本号。在任何时候,我们都会看到一个可能包含5个或更多ruby的列表,我们可以轻松地键入一个数字而不是X.X.X。这也有助于rvmgemset。 最佳答案 这在RVM2.0中是可能的=>https://docs.google.com/document/d/1xW9GeEpLOWPcddDg_hOPvK4oeLxJmU3Q5FiCNT7nTAc/edit?usp=sharing-知道链接的任何人都可以发表评论
对于作为String#tr参数的单引号字符串文字中反斜杠的转义状态,我觉得有些神秘。你能解释一下下面三个例子之间的对比吗?我特别不明白第二个。为了避免复杂化,我在这里使用了'd',在双引号中转义时不会改变含义("\d"="d")。'\\'.tr('\\','x')#=>"x"'\\'.tr('\\d','x')#=>"\\"'\\'.tr('\\\d','x')#=>"x" 最佳答案 在tr中转义tr的第一个参数非常类似于正则表达式中的括号字符分组。您可以在表达式的开头使用^来否定匹配(替换任何不匹配的内容)并使用例如a-f来匹配一
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵