
文章目录
Fang是一个Rust的后台任务处理库,采用Postgres DB作为任务队列。同时支持Asynk和Blocking任务。Asynk任务采用的是tokio的特性,Worker工作在tokio下。Blocking任务使用的是std::thread,Worker工作在一个单独的线程。
添加Fang到你的Cargo.toml文件中
注意Fang仅支持rust 1.62+版本
[dependencies]
fang = { version = "0.7" , features = ["blocking"], default-features = false }
[dependencies]
fang = { version = "0.7" , features = ["asynk"], default-features = false }
fang = { version = "0.7" }
这里需要使用
Diesel CLI来完成数据库的迁移,将在后面的文章中介绍
在你的Postgres DB中创建fang_tasks表,然后运行以下脚本
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished');
CREATE TABLE fang_tasks (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
metadata jsonb NOT NULL,
error_message TEXT,
state fang_task_state default 'new' NOT NULL,
task_type VARCHAR default 'common' NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE INDEX fang_tasks_state_index ON fang_tasks(state);
CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type);
CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at);
CREATE INDEX fang_tasks_metadata_index ON fang_tasks(metadata);
CREATE TABLE fang_periodic_tasks (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
metadata jsonb NOT NULL,
period_in_seconds INTEGER NOT NULL,
scheduled_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at);
CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata);
这些文件可以在源码目录
migrations中找到,github搜Fang,然后进入下载源码。
每个要被Fang执行的任务都必须实现fang::Runnable特质,特质实现#[typetag::serde]使之具有反序列化任务的属性。
use fang::Error;
use fang::Runnable;
use fang::typetag;
use fang::PgConnection;
use fang::serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct MyTask {
pub number: u16,
}
#[typetag::serde]
impl Runnable for MyTask {
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
println!("the number is {}", self.number);
Ok(())
}
}
run函数的第二个参数是PgConnection,你可以重复使用它来操作任务队列,例如在当前作业执行期间添加一个新任务,或者,如果你要复用,可以在自己的查询中重新使用它。如果你不需要它,就忽略它。
每个要被Fang执行的任务都必须实现fang::AsyncRunnable特质
注意不要实现两个同名的AsyncRunnable,这会导致typetag失败
use fang::AsyncRunnable;
use fang::asynk::async_queue::AsyncQueueable;
use fang::serde::{Deserialize, Serialize};
use fang::async_trait;
#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct AsyncTask {
pub number: u16,
}
#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(())
}
// this func is optional to impl
// Default task-type it is common
fn task_type(&self) -> String {
"my-task-type".to_string()
}
}
需要使用Queue::enqueue_task来入队列
use fang::Queue;
...
Queue::enqueue_task(&MyTask { number: 10 }).unwrap();
上面的示例在每次调用时都会创建一个新的 postgres 连接
重用相同的 postgres 连接来将多个任务排入队列
let queue = Queue::new();
for id in &unsynced_feed_ids {
queue.push_task(&SyncFeedMyTask { feed_id: *id }).unwrap();
}
或者使用PgConnection结构体
Queue::push_task_query(pg_connection, &new_task).unwrap();
使用AsyncQueueable::insert_task来入队,可以根据你自己后端来进行操作,默认为Postgres
use fang::asynk::async_queue::AsyncQueue;
use fang::NoTls;
use fang::AsyncRunnable;
// 创建异步队列
let max_pool_size: u32 = 2;
let mut queue = AsyncQueue::builder()
// Postgres 数据库 url
.uri("postgres://postgres:postgres@localhost/fang")
// 允许的最大连接数控i昂
.max_pool_size(max_pool_size)
// 如果希望任务中的唯一性,则为false
.duplicated_tasks(true)
.build();
// 要进行操作之前,总是要先连接
queue.connect(NoTls).await.unwrap();
举个简单例子我们用的是NoTls,如果你有特殊需求,如果出于某种原因你想加密 postgres 流量。
let task = AsyncTask { 8 };
let task_returned = queue
.insert_task(&task as &dyn AsyncRunnable)
.await
.unwrap();
每个Worker都在一个单独的线程中运行。如果panic,会重新启动。
使用WorkerPool来启动Worker,WorkerPool::new接收一个整型参数,Worker的数量
use fang::WorkerPool;
WorkerPool::new(10).start();
使用shutdown停止线程
use fang::WorkerPool;
worker_pool = WorkerPool::new(10).start().unwrap;
worker_pool.shutdown()
每个Worker都在一个单独的 tokio 任务中运行。如果panic,会重新启动。
使用AsyncWorkerPool来启动Worker
use fang::asynk::async_worker_pool::AsyncWorkerPool;
// 必须创建一个队列
// 插入一些任务
let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
.number_of_workers(max_pool_size)
.queue(queue.clone())
.build();
pool.start().await;
在创建Blocking任务任务的时候,默认只能传入Worker数量参数,如果想要进行自定义配置,需要使用WorkerPool.new_with_params来创建,它接受两个参数——工人数量和WorkerParams结构体。
WorkerParams的定义是这样的
pub struct WorkerParams {
pub retention_mode: Option<RetentionMode>,
pub sleep_params: Option<SleepParams>,
pub task_type: Option<String>,
}
pub enum RetentionMode {
KeepAll,
RemoveAll,
RemoveFinished,
}
pub struct SleepParams {
pub sleep_period: u64,
pub max_sleep_period: u64,
pub min_sleep_period: u64,
pub sleep_step: u64,
}
使用AsyncWorkerPool的builder方法即可。需要链式调用,创建一个AsyncWorkerPool,然后调用.queue(…),.sleep_params(…)(可选),.retention_mode(…)(可选),.number_of_workers(…)配置,最后调用.build()构建对象。
可以指定Worker类型,来指定指定类型Worker执行指定类型的任务
在Runnable特质中添加方法
...
#[typetag::serde]
impl Runnable for MyTask {
fn run(&self) -> Result<(), Error> {
println!("the number is {}", self.number);
Ok(())
}
fn task_type(&self) -> String {
"number".to_string()
}
}
设置task_type
let mut worker_params = WorkerParams::new();
worker_params.set_task_type("number".to_string());
WorkerPool::new_with_params(10, worker_params).start();
没有设置task_type的Worker可以执行任何任务
功能与Blocking任务相同。使用AsyncWorker的builer来设置
默认情况下,所有成功完成的任务都会从数据库中删除,失败的任务不会。可以使用三种保留模式:
pub enum RetentionMode {
KeepAll, \\ 不删除任务
RemoveAll, \\ 删除所有任务
RemoveFinished, \\ 默认值,完成就删除
}
使用set_retention_mode设置保留模式
let mut worker_params = WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::RemoveAll);
WorkerPool::new_with_params(10, worker_params).start();
使用AsyncWorker的builder。
使用 useSleepParams来配置睡眠值:
pub struct SleepParams {
pub sleep_period: u64, \\ 默认值 5
pub max_sleep_period: u64, \\ 默认值 15
pub min_sleep_period: u64, \\ 默认值 5
pub sleep_step: u64, \\ 默认值 5
}
如果数据库中没有任务,则Worker会休眠sleep_period,并且每次该值都会增加sleep_step,直到达到max_sleep_period. min_sleep_period是sleep_period的初始值。所有值都以秒为单位。
使用set_sleep_params来设置
let sleep_params = SleepParams {
sleep_period: 2,
max_sleep_period: 6,
min_sleep_period: 2,
sleep_step: 1,
};
let mut worker_params = WorkerParams::new();
worker_params.set_sleep_params(sleep_params);
WorkerPool::new_with_params(10, worker_params).start();
使用AsyncWorker的builder。
如果你从头到尾看的本文,那么什么也不需要做,否则你需要创建
fang_periodic_tasks表,就在本文安装那个部分。
use fang::Scheduler;
use fang::Queue;
let queue = Queue::new();
queue
.push_periodic_task(&SyncMyTask::default(), 120)
.unwrap();
queue
.push_periodic_task(&DeliverMyTask::default(), 60)
.unwrap();
Scheduler::start(10, 5);
在上面的示例中,push_periodic_task用于将指定的任务保存到表fang_periodic_tasks中,该表将fang_tasks每隔指定的秒数排队(保存到表中)。
Scheduler::start(10, 5)启动调度程序。它接受两个参数:
use fang::asynk::async_scheduler::Scheduler;
use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_queue::AsyncQueue;
// 在此之前构建一个Async队列
let schedule_in_future = Utc::now() + OtherDuration::seconds(5);
let _periodic_task = queue.insert_periodic_task(
&AsyncTask { number: 1 },
schedule_in_future,
10,
)
.await;
let check_period: u64 = 1;
let error_margin_seconds: u64 = 2;
let mut scheduler = Scheduler::builder()
.check_period(check_period)
.error_margin_seconds(error_margin_seconds)
.queue(&mut queue as &mut dyn AsyncQueueable)
.build();
// 在其他线程或循环之前添加更多任务
// 调度程序循环
scheduler.start().await.unwrap();
以上就是本文的所有内容,介绍了Rust中借助Fang库来实现后台任务,进行后台任务的处理,还有定时任务,配置等。
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
如果我使用ruby版本2.5.1和Rails版本2.3.18会怎样?我有基于rails2.3.18和ruby1.9.2p320构建的rails应用程序,我只想升级ruby的版本,而不是rails,这可能吗?我必须面对哪些挑战? 最佳答案 GitHub维护apublicfork它有针对旧Rails版本的分支,有各种变化,它们一直在运行。有一段时间,他们在较新的Ruby版本上运行较旧的Rails版本,而不是最初支持的版本,因此您可能会发现一些关于需要向后移植的有用提示。不过,他们现在已经有几年没有使用2.3了,所以充其量只能让更
我是ruby的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp
我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里
我已经找到了几个使用datamapper的示例,并且能够让它们正常工作。不过,所有这些示例都是针对sqlite数据库的。我正在尝试将数据映射器与postgresql一起使用。我将datamapper中的调用从sqlite3更改为postgres,并且我已经安装了dm-postgres-adapter。但它仍然不起作用。我还需要做什么? 最佳答案 与SQLite不同,PostgreSQL不将数据库存储在单个文件中。在你拥有createdyourdatabase之后,尝试这样的事情:DataMapper.setup:default,{:
我对图像处理完全陌生。我对JPEG内部是什么以及它是如何工作一无所知。我想知道,是否可以在某处找到执行以下简单操作的ruby代码:打开jpeg文件。遍历每个像素并将其颜色设置为fx绿色。将结果写入另一个文件。我对如何使用ruby-vips库实现这一点特别感兴趣https://github.com/ender672/ruby-vips我的目标-学习如何使用ruby-vips执行基本的图像处理操作(Gamma校正、亮度、色调……)任何指向比“helloworld”更复杂的工作示例的链接——比如ruby-vips的github页面上的链接,我们将不胜感激!如果有ruby-
我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d