我关注了celery docs在我的开发机器上定义 2 个队列。
我的 celery 设置:
CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60 # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
'arena.social.tasks.Update': {
'queue': 'fs_feeds',
},
}
我在项目的 virtualenv 中打开了两个终端窗口,并运行了以下命令:
terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker
我得到的是两个队列都在处理所有任务。
我的目标是让一个队列只处理 CELERY_ROUTES 中定义的一个任务,并让默认队列处理所有其他任务。
我也关注了这个SO question ,rabbitmqctl list_queues返回celery 0,运行rabbitmqctl list_bindings两次返回exchange celery queue celery []。重启兔子服务器没有改变任何东西。
最佳答案
好吧,我想通了。以下是我的整个设置、设置以及如何运行 celery ,供那些可能想知道与我的问题相同的人使用。
设置
CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
'arena.social.tasks.Update': {
'queue': 'feeds',
'routing_key': 'long_tasks',
},
}
如何运行celery?
终端 - 选项卡 1:
celery -A proj worker -Q default -l debug -n default_worker
这将启动第一个使用默认队列中的任务的工作人员。笔记! -n default_worker 对于第一个 worker 来说不是必须的,但是如果你有任何其他 celery 实例正在运行,那么它是必须的。设置 -n worker_name 与 --hostname=default@%h 相同。
终端 - 选项卡 2:
celery -A proj worker -Q feeds -l debug -n feeds_worker
这将启动第二个 worker,它从 feeds 队列中消费任务。请注意 -n feeds_worker,如果您使用 -l debug(日志级别 = debug)运行,您将看到两个 worker 正在同步。
终端 - 选项卡 3:
celery -A proj beat -l debug
这将启动节拍,根据您的 CELERYBEAT_SCHEDULE 中的时间表执行任务。
我不必更改任务或 CELERYBEAT_SCHEDULE。
例如,这是我的 CELERYBEAT_SCHEDULE 应该进入提要队列的任务的样子:
CELERYBEAT_SCHEDULE = {
...
'update_feeds': {
'task': 'arena.social.tasks.Update',
'schedule': crontab(minute='*/6'),
},
...
}
如您所见,无需添加 'options': {'routing_key': 'long_tasks'} 或指定它应该进入的队列。另外,如果您想知道为什么 Update 是大写的,那是因为它是一个自定义任务,被定义为 celery.Task 的子类。
更新 Celery 5.0+
Celery 自版本 5 以来进行了一些更改,这里是任务路由的更新设置。
如何创建队列?
Celery 可以自动创建队列。它非常适合简单的情况,其中路由的 celery 默认值是可以的。
task_create_missing_queues=True 或者,如果您使用的是 django 设置并且您在 CELERY_ 键下命名空间所有 celery 配置,CELERY_TASK_CREATE_MISSING_QUEUES=True。请注意,它默认处于启用状态。
自动计划任务路由
配置 celery 应用程序后:
celery_app.conf.beat_schedule = {
"some_scheduled_task": {
"task": "module.path.some_task",
"schedule": crontab(minute="*/10"),
"options": {"queue": "queue1"}
}
}
自动任务路由
Celery 应用程序仍然需要先配置,然后:
app.conf.task_routes = {
"module.path.task2": {"queue": "queue2"},
}
任务的手动路由
如果您想动态路由任务,那么在发送任务时指定队列:
from module import task
def do_work():
# do some work and launch the task
task.apply_async(args=(arg1, arg2), queue="queue3")
可以在此处找到有关重新路由的更多详细信息: https://docs.celeryproject.org/en/stable/userguide/routing.html
关于在这里调用任务: https://docs.celeryproject.org/en/stable/userguide/calling.html
关于python - 本地主机上的 Django/Celery 多个队列 - 路由不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23129967/
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
我有一个具有一些属性的模型:attr1、attr2和attr3。我需要在不执行回调和验证的情况下更新此属性。我找到了update_column方法,但我想同时更新三个属性。我需要这样的东西:update_columns({attr1:val1,attr2:val2,attr3:val3})代替update_column(attr1,val1)update_column(attr2,val2)update_column(attr3,val3) 最佳答案 您可以使用update_columns(attr1:val1,attr2:val2
如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象
我正在尝试修改当前依赖于定义为activeresource的gem:s.add_dependency"activeresource","~>3.0"为了让gem与Rails4一起工作,我需要扩展依赖关系以与activeresource的版本3或4一起工作。我不想简单地添加以下内容,因为它可能会在以后引起问题:s.add_dependency"activeresource",">=3.0"有没有办法指定可接受版本的列表?~>3.0还是~>4.0? 最佳答案 根据thedocumentation,如果你想要3到4之间的所有版本,你可以这
我没有找到太多关于如何执行此操作的信息,尽管有很多关于如何使用像这样的redirect_to将参数传递给重定向的建议:action=>'something',:controller=>'something'在我的应用程序中,我在路由文件中有以下内容match'profile'=>'User#show'我的表演Action是这样的defshow@user=User.find(params[:user])@title=@user.first_nameend重定向发生在同一个用户Controller中,就像这样defregister@title="Registration"@user=Use
我正在尝试按0-9和a-z的顺序创建数字和字母列表。我有一组值value_array=['0','1','2','3','4','5','6','7','8','9','a','b','光盘','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','','u','v','w','x','y','z']和一个组合列表的数组,按顺序,这些数字可以产生x个字符,比方说三个list_array=[]和一个当前字母和数字组合的数组(在将它插入列表数组之前我会把它变成一个字符串,]current_combo['0','0','0']
是否有可能:before_filter:authenticate_user!||:authenticate_admin! 最佳答案 before_filter:do_authenticationdefdo_authenticationauthenticate_user!||authenticate_admin!end 关于ruby-on-rails-before_filter运行多个方法,我们在StackOverflow上找到一个类似的问题: https://