Prepare 阶段和 Commit 阶段。
另外,InnoDB 使用的锁机制是悲观锁。在悲观锁中,事务是在操作之初执行加锁操作,如果锁资源被其它事务占用了,则该事务会被阻塞。
基于这两点,我们不难推断出,两个事务如果都进入了 Prepare 阶段,则意味着它们之间是没有锁冲突的,在从库重放时可并行执行。这就是 Commit-Parent-Based 方案的核心思想。
具体实现上:
Trx1 ------------P----------C-------------------------------->
|
Trx2 ----------------P------+---C---------------------------->
| |
Trx3 -------------------P---+---+-----C---------------------->
| | |
Trx4 -----------------------+-P-+-----+----C----------------->
| | | |
Trx5 -----------------------+---+-P---+----+---C------------->
| | | | |
Trx6 -----------------------+---+---P-+----+---+---C---------->
| | | | | |
Trx7 -----------------------+---+-----+----+---+-P-+--C------->
| | | | | | |
示例中的 Trx 指的是事务,P 指的是事务在进行 Prepare 阶段之前,读取 commit-parent 的时间点。C 指的是事务在进行 Commit 阶段之前,增加全局计数器的时间点。
下面看看这 7 个事务的并行执行情况。
Trx1 -----L---------C------------>
Trx2 ----------L---------C------->
反之,则不可并行重放,例如,
Trx1 -----L----C----------------->
Trx2 ---------------L----C------->
这里的 L 代表锁区间的开始点,C 代表锁区间的结束点。
在具体实现上,主库引入了以下 4 个变量:
transaction.sequence_number = ++global.transaction_counter
序列号不是一直递增的,每切换一个 binlog,都会将 transaction.sequence_number 重置为 1。
global.max_committed_transaction = max(global.max_committed_transaction,
transaction.sequence_number)
transaction.last_committed = global.max_committed_transaction
Trx1: last_committed=0 sequence_number=1
Trx2: last_committed=0 sequence_number=2
Trx3: last_committed=0 sequence_number=3
Trx4: last_committed=1 sequence_number=4
Trx5: last_committed=2 sequence_number=5
Trx6: last_committed=2 sequence_number=6
Trx7: last_committed=5 sequence_number=7
transaction.last_committed < transaction_sequence[0].sequence_number
最后,回到示例中的 7 个事务,结合 binlog 中的 last_committed 和 sequence_number,我们看看这 7 个事务的并行执行情况。
writeset 和 WRITESET 两者的区别,前者指的是事务的写集合,后者则特指 WRITESET 方案。
void Writeset_trx_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent) {
Rpl_transaction_write_set_ctx *write_set_ctx =
thd->get_transaction()->get_transaction_write_set_ctx();
std::vector<uint64> *writeset = write_set_ctx->get_write_set();
#ifndef NDEBUG
/* 空事务的写集合必须为空 */
if (is_empty_transaction_in_binlog_cache(thd)) assert(writeset->size() == 0);
#endif
/*
判断一个事务能否使用 WRITESET 方案
*/
bool can_use_writesets =
// 事务写集合的大小不为 0 或者事务为空事务
(writeset->size() != 0 || write_set_ctx->get_has_missing_keys() ||
is_empty_transaction_in_binlog_cache(thd)) &&
// 事务的 transaction_write_set_extraction 必须与全局设置一致
(global_system_variables.transaction_write_set_extraction ==
thd->variables.transaction_write_set_extraction) &&
// 不能被其它表外键关联
!write_set_ctx->get_has_related_foreign_keys() &&
// 事务写集合的大小不能超过 binlog_transaction_dependency_history_size
!write_set_ctx->was_write_set_limit_reached();
bool exceeds_capacity = false;
if (can_use_writesets) {
/*
检查 m_writeset_history 加上事务写集合的大小是否超过 m_writeset_history 的上限,
m_writeset_history 的上限由参数 binlog_transaction_dependency_history_size 决定
*/
exceeds_capacity =
m_writeset_history.size() + writeset->size() > m_opt_max_history_size;
/*
计算所有冲突行中最大的 sequence_number,并将被修改行的哈希值插入 m_writeset_history
*/
int64 last_parent = m_writeset_history_start;
for (std::vector<uint64>::iterator it = writeset->begin();
it != writeset->end(); ++it) {
Writeset_history::iterator hst = m_writeset_history.find(*it);
if (hst != m_writeset_history.end()) {
if (hst->second > last_parent && hst->second < sequence_number)
last_parent = hst->second;
hst->second = sequence_number;
} else {
if (!exceeds_capacity)
m_writeset_history.insert(
std::pair<uint64, int64>(*it, sequence_number));
}
}
// 如果表上都存在主键,则会取 last_parent 和 commit_parent 的较小值作为事务的 commit_parent。
if (!write_set_ctx->get_has_missing_keys()) {
commit_parent = std::min(last_parent, commit_parent);
}
}
if (exceeds_capacity || !can_use_writesets) {
m_writeset_history_start = sequence_number;
m_writeset_history.clear();
}
}
该函数的处理流程如下:
m_writeset_history + writeset 的大小是否超过 binlog_transaction_dependency_history_size 的设置。
分析压测结果,我们可以得出以下结论。
slave_parallel_type = LOGICAL_CLOCK
slave_parallel_workers = 16
slave_preserve_commit_order = ON
下面看看这三个参数的的具体含义。
slave_parallel_type
设置从库并行复制的类型。该参数有以下取值:
binlog_transaction_dependency_tracking = WRITESET_SESSION
transaction_write_set_extraction = XXHASH64
binlog_transaction_dependency_history_size = 25000
binlog_format = ROW
注意,基于 WRITESET 的并行复制方案,只在 binlog 格式为 ROW 的情况下才生效。
在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',
文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co
我明白了:x,(y,z)=1,*[2,3]x#=>1y#=>2z#=>nil我想知道为什么z的值为nil。 最佳答案 x,(y,z)=1,*[2,3]右侧的splat*是内联扩展的,所以它等同于:x,(y,z)=1,2,3左边带括号的列表被视为嵌套赋值,所以它等价于:x=1y,z=23被丢弃,而z被分配给nil。 关于ruby-带括号和splat运算符的并行赋值,我们在StackOverflow上找到一个类似的问题: https://stackoverflow
假设您在Ruby中执行此操作:ar=[1,2]x,y=ar然后,x==1和y==2。是否有一种方法可以在我自己的类中定义,从而产生相同的效果?例如rb=AllYourCode.newx,y=rb到目前为止,对于这样的赋值,我所能做的就是使x==rb和y=nil。Python有这样一个特性:>>>classFoo:...def__iter__(self):...returniter([1,2])...>>>x,y=Foo()>>>x1>>>y2 最佳答案 是的。定义#to_ary。这将使您的对象被视为要分配的数组。irb>o=Obje
我看到其他人也遇到过类似的问题,但没有一个解决方案对我有用。0.3.14gem与其他gem文件一起存在。我已经完全按照此处指示完成了所有操作:https://github.com/brianmario/mysql2.我仍然得到以下信息。我不知道为什么安装程序指示它找不到include目录,因为我已经检查过它存在。thread.h文件存在,但不在ruby目录中。相反,它在这里:C:\RailsInstaller\DevKit\lib\perl5\5.8\msys\CORE\我正在运行Windows7并尝试在Aptana3中构建我的Rails项目。我的Ruby是1.9.3。$gemin
我想编写一个ruby脚本来递归复制目录结构,但排除某些文件类型。因此,给定以下目录结构:folder1folder2file1.txtfile2.txtfile3.csfile4.htmlfolder2folder3file4.dll我想复制这个结构,但不包含.txt和.cs文件。因此,生成的目录结构应如下所示:folder1folder2file4.htmlfolder2folder3file4.dll 最佳答案 您可以使用查找模块。这是一个代码片段:require"find"ignored_extensions=[".cs"
我有一个应用程序正在从Ruby迁移到JRuby(由于需要通过Java提供更好的Web服务安全支持)。我使用的gem之一是daemons创建后台作业。问题在于它使用fork+exec来创建后台进程,但这对JRuby来说是禁忌。那么-是否有用于创建后台作业的替代gem/wrapper?我目前的想法是只从shell脚本调用rake并让rake任务永远运行......提前致谢,克里斯。更新我们目前正在使用几个与Java线程相关的包装器,即https://github.com/jmettraux/rufus-scheduler和https://github.com/philostler/acts
我已经开始使用mysql2gem。我试图弄清楚一些基本的事情——其中之一是如何明确地执行事务(对于批处理操作,比如多个INSERT/UPDATE查询)。在旧的ruby-mysql中,这是我的方法:client=Mysql.real_connect(...)inserts=["INSERTINTO...","UPDATE..WHEREid=..",#etc]client.autocommit(false)inserts.eachdo|ins|beginclient.query(ins)rescue#handleerrorsorabortentirelyendendclient.commi
之前有人问过这个问题,我发现了以下clip关于如何一次设置一个类对象的所有属性,但由于批量分配保护,这在Rails中是不可能的。(例如,您不能Object.attributes={})有没有一种很好的方法可以将一个类的属性合并到另一个类中?object1.attributes=object2.attributes.inject({}){|h,(k,v)|h[k]=vifObjectModel.column_names.include?(k);h}谢谢。 最佳答案 利用assign_attributes使用:without_prote
(跟进我之前的问题,Ruby:howcanIcopyavariablewithoutpointingtothesameobject?)我正在编写一个简单的Ruby程序来在.svg文件中进行一些替换。第一步是从文件中提取信息并将其放入数组中。为了避免每次调用此函数时都从磁盘读取文件,我尝试使用memoize设计模式-在第一次调用后的每次调用中都使用缓存结果。为此,我使用了一个在函数之前定义的全局变量。但是,即使我在返回局部变量之前将该变量.dup为局部变量,调用该变量的函数仍在修改全局变量。这是我的实际代码:#memoizetokeepfromhavingtoreadoriginalfi