草庐IT

通过surging的后台托管服务编写任务调度并支持规则引擎自定义脚本

fanly11 2023-03-28 原文

简介

     过去,如果在业务中需要处理任务调度的时候,大家都会使用第三方的任务调度组件,而第三方组件有一套自己的规则,在微服务的中显得那么格格不入,这样就会造成代码臃肿,耦合性高,如果有分布式还需要搭建新的分布式环境,如果把任务调度做成组件服务,这个就完全满足了微服务的模块化,组件化,而下面谈的是在surging 中如何支持规则引擎自定义脚本。

调度频率设置

       首先在开始之前,先看看如何通过脚本分配多种调度计划,先看下表:

方法描述
EveryMinute() 每分钟执行一次任务
EveryFiveMinutes(); 每五分钟执行一次任务
EveryTenMinutes();  每十分钟执行一次任务
EveryThirtyMinutes() 每半小时执行一次任务
Hourly(); 每小时执行一次任务
HourlyAt(10) 每一个小时的第 10 分钟运行一次
Daily() 每到午夜执行一次任务
DailyAt("3:00") 在 3:00 执行一次任务
TwiceDaily(1, 3) 在 1:00 和 3:00 分别执行一次任务
Weekly() 每周执行一次任务
Monthly() 每月执行一次任务
MonthlyOn(4, "3:00") 在每个月的第四天的 3:00 执行一次任务
Quarterly() 每季度执行一次任务
Yearly() 每年执行一次任务
Timezone("utc") 设置utc时区

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 



举个例子,在工作日每三秒在时间8:00-23:30内执行任务。脚本如下:

parser.TimeZone(""utc"")
      .Weekdays()
.SecondAt(3)
.Between(""8:00"", ""23:30"")

 额外的限制条件列表如下:

方法描述
Weekdays() 限制任务在工作日
Sundays() 限制任务在星期日
Mondays() 限制任务在星期一
Tuesdays() 限制任务在星期二
Wednesdays() 限制任务在星期三
Thursdays() 限制任务在星期四
Fridays() 限制任务在星期五
Saturdays() 限制任务在星期六
When( function(lastExecTime)) 限制任务基于一个script脚本返回为真的验证
Skip( function(lastExecTime)) 限制任务基于一个script脚本返回为假的验证

 

 

 

 

 

 

 

 

 

 

 

 

举个例子,在工作日每三秒在时间8:00-23:30内执行任务。如果设置When返回为true,skip返回false 就会执行,脚本如下:

parser.TimeZone(""utc"")
       .When(function(lastExecTime){
                return true;
            })
       .Skip(
             function(lastExecTime){
                return false;
            })
      .Weekdays()
      .SecondAt(3)
       .Between(""8:00"", ""23:30"")

然后在function 脚本中支持DateUtils对象,可以针对lastExecTime进行逻辑判断,比如是否是周末:DateUtils.IsWeekend(lastExecTime), 是否是今天DateUtils.IsToday(lastExecTime),代码如下:

 

parser.TimeZone(""utc"")
       .When(function(lastExecTime){
               return DateUtils.IsToday(lastExecTime);
            })
       .Skip(
             function(lastExecTime){
                return DateUtils.IsWeekend(lastExecTime);
            })
      .Weekdays()
      .SecondAt(3)
       .Between(""8:00"", ""23:30"")

 

编写调度服务

surging微服务引擎是支持后台管理托管服务的,如果要基于BackgroundService编写任务调度,那服务就要继承BackgroundServiceBehavior,还要继承ISingleInstance以设置注入单例模式,

首先,创建接口服务,这样就可以远程添加任务,开启关闭服务了,代码如下:

   [ServiceBundle("Background/{Service}")]
    public interface IWorkService : IServiceKey
    {
        Task<bool> AddWork(Message message);

         Task StartAsync();

        Task StopAsync();
    }

然后创建业务领域服务,以下代码是通过规则引擎自定义脚本设置执行频率,并且可以设置execsize 以标识同时执行任务的大小,通过以下业务逻辑代码大家可以扩展支持持久化。

public class WorkService : BackgroundServiceBehavior, IWorkService, ISingleInstance
    {
        private readonly ILogger<WorkService> _logger;
        private readonly Queue<Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>> _queue = new Queue<Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>>();
        private readonly ConcurrentDictionary<string, DateTime> _keyValuePairs = new ConcurrentDictionary<string, DateTime>();
        private readonly IServiceProxyProvider _serviceProxyProvider;
        private AtomicLong _atomic=new AtomicLong(1);
        private const int EXECSIZE = 1;
        private CancellationToken _token;

        public WorkService(ILogger<WorkService> logger, IServiceProxyProvider serviceProxyProvider)
        {
            _logger = logger;
            _serviceProxyProvider = serviceProxyProvider;
            /*   var script = @"parser
                               .Weekdays().SecondAt(3).Between(""8:00"", ""22:00"")";*/
            var script = @"parser
                              .TimeZone(""utc"")
                               .When(
                              function(lastExecTime){
                return DateUtils.IsToday(lastExecTime);
            }).Skip(
             function(lastExecTime){
                return DateUtils.IsWeekend(lastExecTime);
            }).Weekdays().SecondAt(3).Between(""8:00"", ""23:30"")";
            var ruleWorkflow = GetSchedulerRuleWorkflow(script);
            var messageId = Guid.NewGuid().ToString();
            _keyValuePairs.AddOrUpdate(messageId, DateTime.Now, (key, value) => DateTime.Now);
            _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(new Message() { MessageId= messageId,Config=new SchedulerConfig() {  IsPersistence=true} }, GetRuleEngine(ruleWorkflow), ruleWorkflow));

        }

        public  Task<bool> AddWork(Message message)
        {
            var ruleWorkflow = GetSchedulerRuleWorkflow(message.Config.Script);
            _keyValuePairs.AddOrUpdate(message.MessageId, DateTime.Now, (key, value) => DateTime.Now);
            _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(message, GetRuleEngine(ruleWorkflow), ruleWorkflow));
            return Task.FromResult(true);
        }

        protected override async  Task ExecuteAsync(CancellationToken stoppingToken)
        {
            try
            {
                _token = stoppingToken;
                _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now); 
                _queue.TryDequeue(out Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>? message);
                if (message != null)
                {
                    var parser = await GetParser(message.Item3, message.Item2);
                    await PayloadSubscribe(parser, message.Item1, message.Item2, message.Item3);
                    _keyValuePairs.TryGetValue(message.Item1.MessageId, out DateTime dateTime);
                    parser.Build(dateTime == DateTime.MinValue ? DateTime.Now : dateTime);
                }
                if (!_token.IsCancellationRequested && (message == null || _atomic.GetAndAdd(1) == EXECSIZE))
                {
                    _atomic = new AtomicLong(1);
                    await Task.Delay(1000, stoppingToken);

                }
            }
            catch (Exception ex){
                _logger.LogError("WorkService execute error, message:{message} ,trace info:{trace} ", ex.Message, ex.StackTrace);
            }
        }

        public async Task StartAsync()
        {
            if (_token.IsCancellationRequested)
            { 
                await base.StartAsync(_token);
            }
        }

        public async Task StopAsync()
        {
            if (!_token.IsCancellationRequested)
            {
               await  base.StopAsync(_token);
            }
        }

        private async Task PayloadSubscribe(RulePipePayloadParser parser, Message message, RulesEngine.RulesEngine rulesEngine, SchedulerRuleWorkflow ruleWorkflow)
        {
            parser.HandlePayload().Subscribe(async (temperature) =>
            {
                try
                {
                    if (temperature)
                    {
                       await  ExecuteByPlanAsyn(message);
                        _logger.LogInformation("Worker exec at: {time}", DateTimeOffset.Now);

                    }
                }
                catch (Exception ex) { }
                finally
                {
                    if (message.Config.IsPersistence || (!temperature && !message.Config.IsPersistence))
                        _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(message, rulesEngine, ruleWorkflow));

                }
            });
        }

        private async Task<bool> ExecuteByPlanAsyn(Message message)
        {
            var result = false;
            var isExec = true;
            try
            {
                if (!string.IsNullOrEmpty(message.RoutePath))
                {
                    var serviceResult = await _serviceProxyProvider.Invoke<object>(message.Parameters, message.RoutePath, message.ServiceKey);
                    bool.TryParse(serviceResult?.ToString(), out result);
                    isExec = true;
                }
            }
            catch { }
            finally
            {
                if (isExec && message.Config.IsPersistence)
                    _keyValuePairs.AddOrUpdate(message.MessageId, DateTime.Now, (key, value) => DateTime.Now);
                else if (!message.Config.IsPersistence)
                    _keyValuePairs.TryRemove(message.MessageId, out DateTime dateTime);
            }
            return result;
        }

        private async Task<RulePipePayloadParser> GetParser(SchedulerRuleWorkflow ruleWorkflow, RulesEngine.RulesEngine engine)
        {
            var payloadParser = new RulePipePayloadParser();
            var ruleResult = await engine.ExecuteActionWorkflowAsync(ruleWorkflow.WorkflowName, ruleWorkflow.RuleName, new RuleParameter[] { new RuleParameter("parser", payloadParser) });
            if (ruleResult.Exception != null && _logger.IsEnabled(LogLevel.Error))
                _logger.LogError(ruleResult.Exception, ruleResult.Exception.Message);
            return payloadParser;
        }

        private RulesEngine.RulesEngine GetRuleEngine(SchedulerRuleWorkflow ruleWorkFlow)
        {
            var reSettingsWithCustomTypes = new ReSettings { CustomTypes = new Type[] { typeof(RulePipePayloadParser) } };
            var result = new RulesEngine.RulesEngine(new Workflow[] { ruleWorkFlow.GetWorkflow() }, null, reSettingsWithCustomTypes);
            return result;
        }

        private SchedulerRuleWorkflow GetSchedulerRuleWorkflow(string script)
        {
            var result = new SchedulerRuleWorkflow("1==1");
            if (!string.IsNullOrEmpty(script))
            {
                result = new SchedulerRuleWorkflow(script);
            }
            return result;
        }
    }

总结

因为工作繁忙,微服务平台暂时搁置,等公司基于surging 的物联网项目上线后,再投入时间研发,surging 一直开发中未曾放弃,也许你没看到的版本才是最强大的。之前的QQ群被封了,如果感兴趣可以加:744677125

开源地址:https://github.com/fanliang11/surging

 

有关通过surging的后台托管服务编写任务调度并支持规则引擎自定义脚本的更多相关文章

  1. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  2. ruby - 通过 rvm 升级 ruby​​gems 的问题 - 2

    尝试通过RVM将RubyGems升级到版本1.8.10并出现此错误:$rvmrubygemslatestRemovingoldRubygemsfiles...Installingrubygems-1.8.10forruby-1.9.2-p180...ERROR:Errorrunning'GEM_PATH="/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/ruby-1.9.2-p180@global:/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/rub

  3. ruby - 在 Ruby 中编写命令行实用程序 - 2

    我想用ruby​​编写一个小的命令行实用程序并将其作为gem分发。我知道安装后,Guard、Sass和Thor等某些gem可以从命令行自行运行。为了让gem像二进制文件一样可用,我需要在我的gemspec中指定什么。 最佳答案 Gem::Specification.newdo|s|...s.executable='name_of_executable'...endhttp://docs.rubygems.org/read/chapter/20 关于ruby-在Ruby中编写命令行实用程序

  4. ruby - 通过 erb 模板输出 ruby​​ 数组 - 2

    我正在使用puppet为ruby​​程序提供一组常量。我需要提供一组主机名,我的程序将对其进行迭代。在我之前使用的bash脚本中,我只是将它作为一个puppet变量hosts=>"host1,host2"我将其提供给bash脚本作为HOSTS=显然这对ruby​​不太适用——我需要它的格式hosts=["host1","host2"]自从phosts和putsmy_array.inspect提供输出["host1","host2"]我希望使用其中之一。不幸的是,我终其一生都无法弄清楚如何让它发挥作用。我尝试了以下各项:我发现某处他们指出我需要在函数调用前放置“function_”……这

  5. ruby-on-rails - form_for 中不在模型中的自定义字段 - 2

    我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢

  6. ruby - 如何使用 RSpec::Core::RakeTask 创建 RSpec Rake 任务? - 2

    如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake

  7. ruby - 通过 ruby​​ 进程共享变量 - 2

    我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是

  8. ruby - 通过 RVM (OSX Mountain Lion) 安装 Ruby 2.0.0-p247 时遇到问题 - 2

    我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search

  9. ruby-on-rails - Enumerator.new 如何处理已通过的 block ? - 2

    我在理解Enumerator.new方法的工作原理时遇到了一些困难。假设文档中的示例:fib=Enumerator.newdo|y|a=b=1loopdoy[1,1,2,3,5,8,13,21,34,55]循环中断条件在哪里,它如何知道循环应该迭代多少次(因为它没有任何明确的中断条件并且看起来像无限循环)? 最佳答案 Enumerator使用Fibers在内部。您的示例等效于:require'fiber'fiber=Fiber.newdoa=b=1loopdoFiber.yieldaa,b=b,a+bendend10.times.m

  10. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

随机推荐