MySQL5.5半同步复制实现原理
来自: http://blog.csdn.net//jiao_fuyou/article/details/17161725
在semi-sync replication中,master等待binlog被成功写入到至少一个slave的relay log之后才会继续执行commit,否则会出现等待。当超过预定的等待时间之后,semi-syncreplication会被禁用,切换回异步再提交。
Semi-sync replication在一定程度上保证提交的事务至少有一个已经传到slave上了。
google提出了两个解决方法:full synchronous replication和semi-synchronous replication。但是full sync不仅要保证事务已经传递到slave上,而且还要等到事务在slave上执行完毕才能返回,因此不被推荐。
Semi-sync replication主要流程:
Detailed Design:
Semi-synchronous commit protocol
Our proposed semi-synchronous replication works in the followingway:
-
commit the transaction
-
wait for the replica databases acknowledge that they already received the transaction - this step has a timeout
-
tell the client that the commit has been processed
在第2步中,如果replica 在预定的等待时间内没有返回信息,semi-sync replication会被禁用,切换回asynchronous replication。因此,一个transaction至少需要一次完成TCP环路的时间。
目前的mysqlreplication过程:
在slave端:
Slave调用safe_connect()去连接master;
COM_BINLOG_DUMP 获取master上的binlog信息,主要有:binlog_filename, binlog_pos, binlog_flag, server_id。
在master端:
处理COM_BINLOG_DUMP;
Mysql_binlog_send()向slave传送binlog事件。
因为binlog_flag是由slave发出的,并且在master中进行了设置后返回,所以semi-sync主要在binlog_flag上加了一个额外的头字节,用来判断slave是不是同步的目标。
semisync.h:
static const unsigned char kSyncHeader[2];
static constunsigned char kPacketMagicNum;
static constunsigned char kPacketFlagSync;
Packet 由三部分组成:
1.the magic num
2. thebinlog positon
3. the binlog filename
Master端的执行过程:
在master端创建了一棵搜索树用来存储等待事务。当一个事务提交,并且写到binlog中后,将(binlog_filename, binlog_pos)插入到搜索树中,这样做的目的是为了方便replication thread找出当前等待事务。当事务结束等待后,该事务记录就会被移除出搜索树。
Replication thread读取binlog event,并且检查binlog position是否在搜索树中,然后根据binlog position在搜索树中的位置,设置额外的头字节。
Semisync_master.cc:
int ActiveTranx::insert_tranx_node(const char*log_file_name, my_off_t log_file_pos)
{
…………………………
strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);
ins_node->log_name_[FN_REFLEN-1] = 0;
ins_node->log_pos_ = log_file_pos;
}
//读取slave返回的信息获取binlog的pos,用来确定slave读取事务是否已经完成。
intrepl_semi_report_binlog_update(Binlog_storage_param *param,
const char *log_file,
my_off_t log_pos, uint32 flags)
{
………………….
//为了存储log_file,log_pos,这样的目的是为了确定binlog还需要多长时间才能复制到slave
error=repl_semisync.writeTranxInBinlog(log_file, log_pos);
…………………
}
int ReplSemiSyncMaster::writeTranxInBinlog(const char*log_file_name,
my_off_t log_file_pos)
{
……….
lock();
//更新最新事务提交位置
if(commit_file_name_inited_)
{
int cmp =ActiveTranx::compare(log_file_name, log_file_pos,
commit_file_name_, commit_file_pos_);
…………..
//将(log_file_name,log_file_pos)插入到search tree
if (is_on())
{
assert(active_tranxs_ != NULL);
if(active_tranxs_->insert_tranx_node(log_file_name,log_file_pos))
{
………….
}
}
……….
unlock();
………
}
int repl_semi_report_commit(Trans_param *param)
{
……………………
return repl_semisync.commitTrx(binlog_name, param->log_pos);
…………………
}
CommitTrx()的作用是:如果开启了semi-sync,会让binlog-dump thread去等待slave返回的信息,如果超过了等待时间,禁用semi-sync,通知其他事务都无需等待返回信息。
int ReplSemiSyncMaster::commitTrx(const char*trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos)
//开始调用binlog_dump_thread,首先判断是否有新的semi-sync slave,如果有的话添加到semi-sync master的slave表中。判断完成后调用reportReplyBinlog,以确定该slave是否已经接收到所有的binlog event。
int repl_semi_binlog_dump_start(Binlog_transmit_param*param,
const char *log_file,
my_off_t log_pos)
{
bool semi_sync_slave= repl_semisync.is_semi_sync_slave();
if(semi_sync_slave)
{
repl_semisync.add_slave();
repl_semisync.reportReplyBinlog(param->server_id,log_file, log_pos);
}
…………………..
}
intreportReplyBinlog(uint32 server_id, const char* log_file_name,
my_off_t end_offset);
//发送binlog中的event到semi-sync slave,前面已经讲过,semi synchronous replication主要就是通过发送packet到各个slave上,用来匹配判断一个slave是否是semi-sync slave。
intrepl_semi_before_send_event(Binlog_transmit_param *param,
unsigned char*packet, unsigned long len,
const char*log_file, my_off_t log_pos)
{
return repl_semisync.updateSyncHeader(packet, log_file, log_pos, param->server_id);
}
int updateSyncHeader(unsignedchar *packet,
const char *log_file_name,
my_off_t log_file_pos,
uint32 server_id)
{
……………..
if (sync)
{
(packet)[2] =kPacketFlagSync; //即为在binlog_flag 中添加的一个byte
}
……………
}
//在发送event完之后等待,调用readSlaveReply读取semi-sync slave返回的信息
int repl_semi_after_send_event(Binlog_transmit_param*param,
const char*event_buf, unsigned long len)
{
if(repl_semisync.is_semi_sync_slave())
{
THD *thd=current_thd;
(void) repl_semisync.readSlaveReply(&thd->net,param->server_id, event_buf);
thd->clear_error();
}
return 0;
}
int ReplSemiSyncMaster::readSlaveReply(NET *net,uint32 server_id,
constchar *event_buf)
{
const char*kWho = "ReplSemiSyncMaster::readSlaveReply";
const unsignedchar *packet;
char log_file_name[FN_REFLEN];
my_off_tlog_file_pos;
…………
//计算网络等待时间
rpl_semi_sync_master_net_wait_num++;
rpl_semi_sync_master_net_wait_time += wait_time;
…………
//如果返回信息匹配成功,则调用reportReplyBinlog来返回报告信息
result = reportReplyBinlog(server_id, log_file_name,log_file_pos);
………..
return function_exit(kWho, result);
}
Slave端执行过程分析:
//调用binlog dump thread去连接master,并且读取binlog
int repl_semi_slave_request_dump(Binlog_relay_IO_param*param,
uint32 flags)
{
………
//校验master端有没安装semi-sync
query="SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
……….
//通知master的dump thread该slave一起开启semi-sync
query="SET @rpl_semi_sync_slave= 1";
………
}
//semi-sync读取master端传递过来的binlog,调用slaveReadSyncHeader来识别传送过来的packet
int repl_semi_slave_read_event(Binlog_relay_IO_param*param,
const char *packet, unsigned long len,
const char **event_buf, unsigned long*event_len)
{
if(rpl_semi_sync_slave_status)
return repl_semisync.slaveReadSyncHeader(packet, len,
&semi_sync_need_reply,
event_buf, event_len);
*event_buf=packet;
*event_len=len;
return 0;
}
//header就是master传递过来的packet
//need_reply是判断master是否会等待slave返回信息
int ReplSemiSyncSlave::slaveReadSyncHeader(const char*header,
unsignedlong total_len,
bool *need_reply,
constchar **payload,
unsignedlong *payload_len)
{
const char*kWho = "ReplSemiSyncSlave::slaveReadSyncHeader";
int read_res =0;
function_enter(kWho);
if ((unsigned char)(header[0]) == kPacketMagicNum)
{
*need_reply =(header[1] & kPacketFlagSync);
//将header传递过来的flag与自身的kPacketFlagSync进行匹配,其中header[1]为slave设置的binlog_flag,传递给master后,进行了设置后的新值,可以在updateSyncHeader函数中找到。
………….
}
//slave执行读取的event,执行完毕后调用slaveReply返回信息给master
int repl_semi_slave_queue_event(Binlog_relay_IO_param*param,
constchar *event_buf,
unsigned long event_len,
uint32flags)
{
if(rpl_semi_sync_slave_status && semi_sync_need_reply)
{
(void) repl_semisync.slaveReply(param->mysql,
param->master_log_name,
param->master_log_pos);
}
return 0;
}
int ReplSemiSyncSlave::slaveReply(MYSQL*mysql,
const char*binlog_filename,
my_off_tbinlog_filepos)
//调用mysql->net接口返回结果信息
在mysql 5.5版本中,半同步实现方法和google在5.0和5.1版本当中的方式有点不同,其实现为两个plugin的方式,而且对本身代码的侵入更小,当然这是在原本的代码上进行重构的前提下。
semi_sync_master 和semi_sync_slave
分别在主库和从库上注册。
install plugin rpl_semi_sync_master soname 'semisync_master.so';
install plugin rpl_semi_sync_slave soname 'semisync_slave.so';
实现原理:
MySQL在事务函数接口,Binlog发送接收接口,relay刷盘函数接口的实现中都提供了一个钩子。这些钩子实现为代理对象。通过向代理对象中注册观察者,可以实现在事件触发时调用观察者提供的回调函数。从而达到控制主从同步逻辑的目的。
semi-sync中有四个观察对象,主库中3个,从库中1个。
这些观察者的成员函数会在合适的时候被MySQL内部的代理对象回调。
master中三个观察者:
事务观察者,事务提交或回滚的时候回调。
Trans_observer{
Uint32_t len;
Int (*after_commit)(Trans_param *param)
Int (*after_rollback)(Trans_param *param);
}
Binlog存储观察者。在binlog写盘的时候回调。
Binlog_storage_observer{
uint32_t len;
Int (*after_flush)(Binlog_storage_param *param ,const char *log_file, my_off_t log_pos, uint32 flags)
}
Binlog dump观察者,会在binlog被发送到slave的过程中被调用。
Binlog_transmit_obaserver{
Uint32_t len;
Int (*transmit_start)(Binlog_transmit_param *param,const char *log_file, my_off_t log_pos);
Int (*transmit_stop)(Binlog_transmit_param *param);
Int (*reserve_header)(Binlog_transmit_param *param, unsigned char *header, unsigned long size,unsignedlong *len);
Int (*before_send_event)(Binlog_transmit_param *param,unsigned char *packet, unsigned long len,const char*log_file, my_off_t log_pos );
Int (*after_send_event)(Binlog_transmit_param *param,const char *event_buf, unsigned long len);
Int (*after_reset_master)(Binlog_transmit_param *param);
}
slave中一个观察者
Relay 日志IO观察者
Binlog_relay_IO_observer{
Uint32_t len;
int (*thread_start)(Binlog_relay_IO_param *param);
int (*thread_stop)(Binlog_relay_IO_param *param);
int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
int (*after_read_event)(Binlog_relay_IO_param *param,const char *packet, unsigned long len, const char**event_buf, unsigned long *event_len);
int (*after_queue_event)(Binlog_relay_IO_param *param, const char *event_buf, unsigned long event_len,uint32 flags);
int (*after_reset_slave)(Binlog_relay_IO_param *param);
}
mysql内部的代理对象,
在mysql启动时init_server_components的过程中,会执行delegates_init()函数。该函数初始化了四个代理对象:
Trans_delegate transaction_delegate;
Binlog_storage_delegate binlog_storage_delegate;
Binlog_transmit_delegate binlog_transmit_delegate;
Binlog_relay_IO_delegate binlog_relay_io_delegate;
这四个代理类都是Delegeate类的子类。在Delegate中有一个在其中注册的观察者列表。
typedef List<Observer_info> Observer_info_list;
所有对该代理中发生的动作感兴趣的观察者均可以通过add_observer(void *observer, st_plugin_int *plugin)函数来向代理注册。
当事件发生时,代理将循环调用在其上面注册的观察者提供的回调函数。
例如transaction_delegate代理对象在一个事务提交时被调用,则其会调用如下一个宏,唤醒所有的观察者进行相应的动作。
FOREACH_OBSERVER(ret, after_commit, thd, (¶m));
在mysql的代码中,通过一个宏来调用这四个代理,该宏定义如下
#define RUN_HOOK(group, hook, args) \
(group ##_delegate->is_empty() ? \
0 : group ##_delegate->hook args)
例如在存储引擎接口的父类handler当中,ha_commit_trans接口函数的最末尾发生如下一个调用。
RUN_HOOK(transaction, after_commit, (thd, FALSE));
该宏展开之后:
transaction_delegate->is_empty()? 0 : transaction_delegate->after_commit(thd,FALSE);
如果在transaction_delegate当中注册有观察者。则都会响应after_commit函数。
复制同步过程
有了以上这些接口提供的功能,要实现主从复制的同步,经如下流程。
- 主库事务提交时在提交前的最后一刻,进行如下调用。
RUN_HOOK(transaction, after_commit, (thd, FALSE));
该函数调用会导致主库SQL线程处于"Waiting for semi-sync ACK from slave",状态,等 待binlog event发送到从库之后得到的响应。
RUN_HOOK调用的事务观察代理最终会调用的是Trans_observer观察者的after_commit方法,该函数的参数Trans_param当中包含了当前事务提交的binlog_name和binlog_pos。该after_commit将binlog位置和从从库获得的binlog位置进行对比,如果从库已经对该binlog或该binlog之后的position发送了确认包。则提交该事务,否则事务将等待。
主库当中的binlog dump线程保持着reply_file_name和reply_file_pos两个变量来记录从库当中记录的最新的确认包。在reply_file_name.reply_file_pos之前的binlog都已经被从库接收并同步到磁盘。
replay_file_name和reply_file_pos两个变量,则由Binlog_transmit_delegate代理调用Binlog_transmit_obaserver观察者的回调函数after_send_event来维护。即在接收到新的确认包之后就对这两个变量进行更新。
- 在从库当中的binlog_relay_io_delegate代理负责调用relay_io_observer观察者来响应写relay日志的操作。在binlog写入从库当中的relay日志之后将向主库发送响应。响应回复当中包含了三个字段。这三个字段标志了从库当前写入的这个event对应主库当中的binlog的文件名和pos。
REPLY_MAGIC_NUM | REPLY_BINLOG_POS_ | REPLY_BINLOG_NAME |
在install plugin的过程中,向主库的三个代理对象注册三个监听者,分别监听事务提交事件,binlog写盘事件和binlog发送事件。
在从库的plugin加载的过程中,向一个代理对象注册一个监听者,监听relay的写盘事