过去,如果在业务中需要处理任务调度的时候,大家都会使用第三方的任务调度组件,而第三方组件有一套自己的规则,在微服务的中显得那么格格不入,这样就会造成代码臃肿,耦合性高,如果有分布式还需要搭建新的分布式环境,如果把任务调度做成组件服务,这个就完全满足了微服务的模块化,组件化,而下面谈的是在surging 中如何支持规则引擎自定义脚本。
首先在开始之前,先看看如何通过脚本分配多种调度计划,先看下表:
| 方法 | 描述 |
|---|---|
| EveryMinute() | 每分钟执行一次任务 |
| EveryFiveMinutes(); | 每五分钟执行一次任务 |
| EveryTenMinutes(); | 每十分钟执行一次任务 |
| EveryThirtyMinutes() | 每半小时执行一次任务 |
| Hourly(); | 每小时执行一次任务 |
HourlyAt(10) |
每一个小时的第 10 分钟运行一次 |
Daily() |
每到午夜执行一次任务 |
| DailyAt("3:00") | 在 3:00 执行一次任务 |
| TwiceDaily(1, 3) | 在 1:00 和 3:00 分别执行一次任务 |
Weekly() |
每周执行一次任务 |
Monthly() |
每月执行一次任务 |
| MonthlyOn(4, "3:00") | 在每个月的第四天的 3:00 执行一次任务 |
Quarterly() |
每季度执行一次任务 |
Yearly() |
每年执行一次任务 |
Timezone("utc") |
设置utc时区 |
举个例子,在工作日每三秒在时间8:00-23:30内执行任务。脚本如下:
parser.TimeZone(""utc"")
.Weekdays()
.SecondAt(3)
.Between(""8:00"", ""23:30"")
额外的限制条件列表如下:
| 方法 | 描述 |
|---|---|
Weekdays() |
限制任务在工作日 |
Sundays() |
限制任务在星期日 |
Mondays() |
限制任务在星期一 |
Tuesdays() |
限制任务在星期二 |
Wednesdays() |
限制任务在星期三 |
Thursdays() |
限制任务在星期四 |
Fridays() |
限制任务在星期五 |
Saturdays() |
限制任务在星期六 |
When( function(lastExecTime)) |
限制任务基于一个script脚本返回为真的验证 |
|
限制任务基于一个script脚本返回为假的验证 |
举个例子,在工作日每三秒在时间8:00-23:30内执行任务。如果设置When返回为true,skip返回false 就会执行,脚本如下:
parser.TimeZone(""utc"")
.When(function(lastExecTime){
return true;
})
.Skip(
function(lastExecTime){
return false;
})
.Weekdays()
.SecondAt(3)
.Between(""8:00"", ""23:30"")
然后在function 脚本中支持DateUtils对象,可以针对lastExecTime进行逻辑判断,比如是否是周末:DateUtils.IsWeekend(lastExecTime), 是否是今天DateUtils.IsToday(lastExecTime),代码如下:
parser.TimeZone(""utc"")
.When(function(lastExecTime){
return DateUtils.IsToday(lastExecTime);
})
.Skip(
function(lastExecTime){
return DateUtils.IsWeekend(lastExecTime);
})
.Weekdays()
.SecondAt(3)
.Between(""8:00"", ""23:30"")
surging微服务引擎是支持后台管理托管服务的,如果要基于BackgroundService编写任务调度,那服务就要继承BackgroundServiceBehavior,还要继承ISingleInstance以设置注入单例模式,
首先,创建接口服务,这样就可以远程添加任务,开启关闭服务了,代码如下:
[ServiceBundle("Background/{Service}")]
public interface IWorkService : IServiceKey
{
Task<bool> AddWork(Message message);
Task StartAsync();
Task StopAsync();
}
然后创建业务领域服务,以下代码是通过规则引擎自定义脚本设置执行频率,并且可以设置execsize 以标识同时执行任务的大小,通过以下业务逻辑代码大家可以扩展支持持久化。
public class WorkService : BackgroundServiceBehavior, IWorkService, ISingleInstance
{
private readonly ILogger<WorkService> _logger;
private readonly Queue<Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>> _queue = new Queue<Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>>();
private readonly ConcurrentDictionary<string, DateTime> _keyValuePairs = new ConcurrentDictionary<string, DateTime>();
private readonly IServiceProxyProvider _serviceProxyProvider;
private AtomicLong _atomic=new AtomicLong(1);
private const int EXECSIZE = 1;
private CancellationToken _token;
public WorkService(ILogger<WorkService> logger, IServiceProxyProvider serviceProxyProvider)
{
_logger = logger;
_serviceProxyProvider = serviceProxyProvider;
/* var script = @"parser
.Weekdays().SecondAt(3).Between(""8:00"", ""22:00"")";*/
var script = @"parser
.TimeZone(""utc"")
.When(
function(lastExecTime){
return DateUtils.IsToday(lastExecTime);
}).Skip(
function(lastExecTime){
return DateUtils.IsWeekend(lastExecTime);
}).Weekdays().SecondAt(3).Between(""8:00"", ""23:30"")";
var ruleWorkflow = GetSchedulerRuleWorkflow(script);
var messageId = Guid.NewGuid().ToString();
_keyValuePairs.AddOrUpdate(messageId, DateTime.Now, (key, value) => DateTime.Now);
_queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(new Message() { MessageId= messageId,Config=new SchedulerConfig() { IsPersistence=true} }, GetRuleEngine(ruleWorkflow), ruleWorkflow));
}
public Task<bool> AddWork(Message message)
{
var ruleWorkflow = GetSchedulerRuleWorkflow(message.Config.Script);
_keyValuePairs.AddOrUpdate(message.MessageId, DateTime.Now, (key, value) => DateTime.Now);
_queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(message, GetRuleEngine(ruleWorkflow), ruleWorkflow));
return Task.FromResult(true);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
_token = stoppingToken;
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
_queue.TryDequeue(out Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>? message);
if (message != null)
{
var parser = await GetParser(message.Item3, message.Item2);
await PayloadSubscribe(parser, message.Item1, message.Item2, message.Item3);
_keyValuePairs.TryGetValue(message.Item1.MessageId, out DateTime dateTime);
parser.Build(dateTime == DateTime.MinValue ? DateTime.Now : dateTime);
}
if (!_token.IsCancellationRequested && (message == null || _atomic.GetAndAdd(1) == EXECSIZE))
{
_atomic = new AtomicLong(1);
await Task.Delay(1000, stoppingToken);
}
}
catch (Exception ex){
_logger.LogError("WorkService execute error, message:{message} ,trace info:{trace} ", ex.Message, ex.StackTrace);
}
}
public async Task StartAsync()
{
if (_token.IsCancellationRequested)
{
await base.StartAsync(_token);
}
}
public async Task StopAsync()
{
if (!_token.IsCancellationRequested)
{
await base.StopAsync(_token);
}
}
private async Task PayloadSubscribe(RulePipePayloadParser parser, Message message, RulesEngine.RulesEngine rulesEngine, SchedulerRuleWorkflow ruleWorkflow)
{
parser.HandlePayload().Subscribe(async (temperature) =>
{
try
{
if (temperature)
{
await ExecuteByPlanAsyn(message);
_logger.LogInformation("Worker exec at: {time}", DateTimeOffset.Now);
}
}
catch (Exception ex) { }
finally
{
if (message.Config.IsPersistence || (!temperature && !message.Config.IsPersistence))
_queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(message, rulesEngine, ruleWorkflow));
}
});
}
private async Task<bool> ExecuteByPlanAsyn(Message message)
{
var result = false;
var isExec = true;
try
{
if (!string.IsNullOrEmpty(message.RoutePath))
{
var serviceResult = await _serviceProxyProvider.Invoke<object>(message.Parameters, message.RoutePath, message.ServiceKey);
bool.TryParse(serviceResult?.ToString(), out result);
isExec = true;
}
}
catch { }
finally
{
if (isExec && message.Config.IsPersistence)
_keyValuePairs.AddOrUpdate(message.MessageId, DateTime.Now, (key, value) => DateTime.Now);
else if (!message.Config.IsPersistence)
_keyValuePairs.TryRemove(message.MessageId, out DateTime dateTime);
}
return result;
}
private async Task<RulePipePayloadParser> GetParser(SchedulerRuleWorkflow ruleWorkflow, RulesEngine.RulesEngine engine)
{
var payloadParser = new RulePipePayloadParser();
var ruleResult = await engine.ExecuteActionWorkflowAsync(ruleWorkflow.WorkflowName, ruleWorkflow.RuleName, new RuleParameter[] { new RuleParameter("parser", payloadParser) });
if (ruleResult.Exception != null && _logger.IsEnabled(LogLevel.Error))
_logger.LogError(ruleResult.Exception, ruleResult.Exception.Message);
return payloadParser;
}
private RulesEngine.RulesEngine GetRuleEngine(SchedulerRuleWorkflow ruleWorkFlow)
{
var reSettingsWithCustomTypes = new ReSettings { CustomTypes = new Type[] { typeof(RulePipePayloadParser) } };
var result = new RulesEngine.RulesEngine(new Workflow[] { ruleWorkFlow.GetWorkflow() }, null, reSettingsWithCustomTypes);
return result;
}
private SchedulerRuleWorkflow GetSchedulerRuleWorkflow(string script)
{
var result = new SchedulerRuleWorkflow("1==1");
if (!string.IsNullOrEmpty(script))
{
result = new SchedulerRuleWorkflow(script);
}
return result;
}
}
因为工作繁忙,微服务平台暂时搁置,等公司基于surging 的物联网项目上线后,再投入时间研发,surging 一直开发中未曾放弃,也许你没看到的版本才是最强大的。之前的QQ群被封了,如果感兴趣可以加:744677125
开源地址:https://github.com/fanliang11/surging
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
尝试通过RVM将RubyGems升级到版本1.8.10并出现此错误:$rvmrubygemslatestRemovingoldRubygemsfiles...Installingrubygems-1.8.10forruby-1.9.2-p180...ERROR:Errorrunning'GEM_PATH="/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/ruby-1.9.2-p180@global:/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/rub
我想用ruby编写一个小的命令行实用程序并将其作为gem分发。我知道安装后,Guard、Sass和Thor等某些gem可以从命令行自行运行。为了让gem像二进制文件一样可用,我需要在我的gemspec中指定什么。 最佳答案 Gem::Specification.newdo|s|...s.executable='name_of_executable'...endhttp://docs.rubygems.org/read/chapter/20 关于ruby-在Ruby中编写命令行实用程序
我正在使用puppet为ruby程序提供一组常量。我需要提供一组主机名,我的程序将对其进行迭代。在我之前使用的bash脚本中,我只是将它作为一个puppet变量hosts=>"host1,host2"我将其提供给bash脚本作为HOSTS=显然这对ruby不太适用——我需要它的格式hosts=["host1","host2"]自从phosts和putsmy_array.inspect提供输出["host1","host2"]我希望使用其中之一。不幸的是,我终其一生都无法弄清楚如何让它发挥作用。我尝试了以下各项:我发现某处他们指出我需要在函数调用前放置“function_”……这
我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是
我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search
我在理解Enumerator.new方法的工作原理时遇到了一些困难。假设文档中的示例:fib=Enumerator.newdo|y|a=b=1loopdoy[1,1,2,3,5,8,13,21,34,55]循环中断条件在哪里,它如何知道循环应该迭代多少次(因为它没有任何明确的中断条件并且看起来像无限循环)? 最佳答案 Enumerator使用Fibers在内部。您的示例等效于:require'fiber'fiber=Fiber.newdoa=b=1loopdoFiber.yieldaa,b=b,a+bendend10.times.m
几个月前,我读了一篇关于rubygem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题: