近期写的一个项目,整个项目采用的DDD(领域驱动)设计,所以刚开始设计的时候就将各个业务以聚合根的方式进行划分,以该业务场景为例,整体的业务简述为,当客户进行付款以后,创建一个付款单,然后由财务手动将付款单与发货单进行账务冲抵和关联,同时还需要针对付款的客户及企业的余额进行相应的变动,所以,当付款单和发货单进行冲抵业务的时候,客户及其企业的待付款金额将会根据冲抵的金额,进行变动,所以该业务的主要操作是首先针对发货单的待付款金额进行冲抵扣减,此时操作的聚合根为发货单的聚合根,而因为还需要同时针对用户的账户金额进行变动,所以在操作发货单的聚合根的时候,触发一个领域事件,而用户的聚合根订阅该事件,当该事件被触发的时候,用户的聚合根接收到事件,并随之进行相应的操作。
一般情况下,领域事件可以看作一个一对多的多播事件即一方触发多方进行响应,一个聚合根发生改变并且触发领域事件的时候,其他与之关联的聚合根都将订阅该事件,在被触发的时候进行响应并对自身进行对应的操作,而且事件一般不会存在返回值的情况,所以订阅方的业务是否执行成功,失败后需要进行什么样的操作,可以根据业务的不同进行不同的操作,如果是需要强一致性的业务,就需要考虑操作异常的处理。如果是一致性不强的业务,则可以考虑自身重试等机制。而目前该项目所遇到的就是强一致性的业务需求,那么只能一荣俱荣,一损俱损。
在领域事件的是先方面,我采用的是NetCore项目中比较流行的MediatR组件(一种简单的实现进程内的消息传递机制的类库),采用MediatR的消息通知机制,在进行数据操作的时候,添加并触发领域事件,从而实现领域事件的触发以及订阅处理,同时采用EF的事务来确保数据库在操作数据时候的一致性。
//业务代码
/// <summary>
/// 业务开始
/// </summary>
/// <returns></returns>
public async Task Task(CancellationToken token=default)
{
// 下面所用的未声明对象均有DI生成。
// DbCotext 继承 IUnitOfWork,并且通过IOC将其生命周期设为Scope(请求域)
//_repository 为聚合根的仓储类,在实例化时注入IUnitOfWork进行相应的数据库操作。
//具体略
var db = _repository.UnitOfWork as TestDbContext;
using (var transaction=await db.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted,token))
{
A a = new A();
a.AddDomainEvent(new TestEvent());
await _repository.AddAsync(a);
await db.CommitTransactionAsync(transaction, token);
}
}
//数据库上下文部分方法
/// <summary>
/// 异步提交事务
/// </summary>
/// <param name="transaction"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CommitTransactionAsync(IDbContextTransaction transaction,CancellationToken cancellationToken=default)
{
try
{
await EventTrigger(cancellationToken);
await SaveChangesAsync(cancellationToken);
await transaction.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
await RollbackTransactionAsync(cancellationToken);
throw;
}
finally
{
transaction.Dispose();
transaction = null;
}
}
/// <summary>
/// 事件触发器
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
private async Task EventTrigger(CancellationToken cancellationToken = default(CancellationToken))
{
var mediator =_serviceProvider.GetService<IMediator>()!;
await DispatchDomainEventAsync(mediator,cancellationToken);
}
/// <summary>
/// 调度领域事件
/// </summary>
/// <param name="mediator"></param>
/// <param name="cancellationToken"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
private async Task DispatchDomainEventAsync(IMediator mediator,CancellationToken cancellationToken = default(CancellationToken))
{
//当前上下文的所有添加了领域事件的聚合根
List<EntityEntry<IAggregateRoot>> domainEntries = this.ChangeTracker
.Entries<IAggregateRoot>()
.Where(x => x.Entity.DomainEvents.Any())
.ToList();
//获取领域事件
IEnumerable<INotification> domainEvents = domainEntries.SelectMany(x => x.Entity.DomainEvents).ToList();
foreach (var domainEntry in domainEntries)
{
domainEntry.Entity.ClearDomainEvent();
}
//发送事件
var tasks = domainEvents.Select(async domainEvent =>
{
await mediator.Publish(domainEvent, cancellationToken);
});
//同时执行
await Task.WhenAll(tasks);
}
//订阅方
public class TestEventHandler: INotificationHandler<TestEvent>
{
#region fields
private readonly IBRepository _repository;
#endregion
#region ctor
/// <summary>
/// 事件处理方
/// </summary>
/// <param name="repository"></param>
public TestEventHandler(IBRepository repository)
{
_repository = repository;
}
#endregion
#region 处理程序
/// <summary>
/// 处理程序
/// </summary>
/// <param name="notification"></param>
/// <param name="cancellationToken"></param>
public async Task Handle(TestEvent eventData, CancellationToken cancellationToken)
{
// 参数 eventData 是可以传递数据的,此示例省略
B b = new B();
b.Num = 1;
await _repository.AddAsync(b);
}
#endregion
}
通过上面代码可以推断出,这次业务首先在操作之前开启ef事务,确保数据,一致性,然后在聚合根A进行保存之前触发领域事件,然后通过MediatR对事件进行调度,通知订阅方,而订阅方则根据自身的情况,实现自身的仓储,对操作进行处理。最后通过统一保存,提交事务,确保数据的一致性。
在业务代码实现以后,就针对该项业务进行测试,为了保险起见,专门针对数据一致性进行了测试,而结果大失所望,在数据进行保存的时候,故意调整了表结构的表A在保存的时候报错了没有将数据添加成功,而未调整的表B,则正常添加进了数据,数据的一致性并没有确保成功。这整个事情就变得很邪门了。而后就开始我的爬坑之旅。
起初,我以为是因为在进行调度的时候,采用了异步+Task的方式对领域事件进行了调度操作,所以导致事件在进行处理的时候和主方法的数据库上下不是一个导致的,所以针对数据库上下文的注入方式进行了排查,最后结果是 事件订阅处理方的数据库上下文和主方法的数据库上下文为同一个实例,所以不存在生命周期或不是实例不同的问题。
因为整个项目的架构都是我自己搭建的,出于对自身能力的怀疑,于是就有上面Demo的诞生,上面的Demo是我根据思路又重新调整后写的,结果神奇的一幕出现了,上面的框架事务生效了!!!!(但是这又是另外一个坑,不过不知道是不是负负得正把,反正促使我找到了真正的问题。)
在这个阶段,我进行了疯狂的调试,在调试的时候,特意输出了EF的Debug日志。从事务开启,到事务保存前创建事务保存点,再到保存,报错,回滚,删除事务保存点,这些日志我全都看到EF输出出来了并且排查了一遍,各种操作层出不穷,不再赘述,反正没有解决。
不得不说我飘了,我真真切切的开始怀疑过EFCore,甚至把这部分源码以及文档看了一遍,结果还是没看出什么所以然来。还是无果。
最后!!!!我要感恩的是马桶,在我一次次一天天的失败后,与 昨晚(今天凌晨)12:30在我心灰意冷关了电脑以后,坐在马桶上思考解决方案,随手用手机搜了一下Mysql事务打开了一篇博客,具体博客内容我忘了,是一篇JAVA的,但是核心内容是在JAVA中开启事务不管用的情况,一下我就来劲了,仔细一看,卧槽,我怀疑这个,怀疑那个,为毛就是没有怀疑过是Mysql的问题呢?Mysql的存储引擎,我所使用的版本是5.7.26,它默认的存储引擎是MyISAM的,这玩意它不支持事务啊!!!!!它不支持!!!
在我恶补了Mysql数据库引擎区别以后,我将数据库的存储引擎改为InnoDB后,完美!!!解决了!!(具体区别可以去搜一下,网上到处都是,烂大街了都,我就不复制别人的了)两周,整整两周,只要有时间,就在电脑前摸索研究这个问题,最后却发现是这么一个不起眼的问题导致的。却也说明了我个人对数据库知识的薄弱,后期需要恶补数据库。
使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta
在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/
我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search
我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的
当我尝试安装Ruby时遇到此错误。我试过查看this和this但无济于事➜~brewinstallrubyWarning:YouareusingOSX10.12.Wedonotprovidesupportforthispre-releaseversion.Youmayencounterbuildfailuresorotherbreakages.Pleasecreatepull-requestsinsteadoffilingissues.==>Installingdependenciesforruby:readline,libyaml,makedepend==>Installingrub
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co
我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里
-if!request.path_info.include?'A'%{:id=>'A'}"Text"-else"Text"“文本”写了两次。我怎样才能只写一次并同时检查path_info是否包含“A”? 最佳答案 有两种方法可以做到这一点。使用部分,或使用content_forblock:如果“文本”较长,或者是一个重要的子树,您可以将其提取到一个部分。这会使您的代码变干一点。在给出的示例中,这似乎有点矫枉过正。在这种情况下更好的方法是使用content_forblock,如下所示:-if!request.path_info.inc
我看到其他人也遇到过类似的问题,但没有一个解决方案对我有用。0.3.14gem与其他gem文件一起存在。我已经完全按照此处指示完成了所有操作:https://github.com/brianmario/mysql2.我仍然得到以下信息。我不知道为什么安装程序指示它找不到include目录,因为我已经检查过它存在。thread.h文件存在,但不在ruby目录中。相反,它在这里:C:\RailsInstaller\DevKit\lib\perl5\5.8\msys\CORE\我正在运行Windows7并尝试在Aptana3中构建我的Rails项目。我的Ruby是1.9.3。$gemin