MySQL5.5半同步复制实现原理

wjvi5807 9年前

来自: http://blog.csdn.net//jiao_fuyou/article/details/17161725


在semi-sync replication中,master等待binlog被成功写入到至少一个slave的relay log之后才会继续执行commit,否则会出现等待。当超过预定的等待时间之后,semi-syncreplication会被禁用,切换回异步再提交。

mysql复制(半同步)Semi-sync replication在一定程度上保证提交的事务至少有一个已经传到slave上了。

google提出了两个解决方法:full synchronous replication和semi-synchronous replication。但是full sync不仅要保证事务已经传递到slave上,而且还要等到事务在slave上执行完毕才能返回,因此不被推荐。

Semi-sync replication主要流程:

mysql复制(半同步)Detailed Design:

Semi-synchronous commit protocol

Our proposed semi-synchronous replication works in the followingway:

  1. commit the transaction

  2. wait for the replica databases acknowledge that they already received the transaction - this step has a timeout

  3. tell the client that the commit has been processed

在第2步中,如果replica 在预定的等待时间内没有返回信息,semi-sync replication会被禁用,切换回asynchronous replication。因此,一个transaction至少需要一次完成TCP环路的时间。

目前的mysqlreplication过程:

在slave端:

Slave调用safe_connect()去连接master;

COM_BINLOG_DUMP 获取master上的binlog信息,主要有:binlog_filename, binlog_pos, binlog_flag, server_id。

在master端:

处理COM_BINLOG_DUMP;

Mysql_binlog_send()向slave传送binlog事件。

因为binlog_flag是由slave发出的,并且在master中进行了设置后返回,所以semi-sync主要在binlog_flag上加了一个额外的头字节,用来判断slave是不是同步的目标。

semisync.h:

static const unsigned char kSyncHeader[2];

static constunsigned char kPacketMagicNum;

static constunsigned char kPacketFlagSync;

Packet 由三部分组成:

1.the magic num

2. thebinlog positon

3. the binlog filename

Master端的执行过程:

在master端创建了一棵搜索树用来存储等待事务。当一个事务提交,并且写到binlog中后,将(binlog_filename, binlog_pos)插入到搜索树中,这样做的目的是为了方便replication thread找出当前等待事务。当事务结束等待后,该事务记录就会被移除出搜索树。

Replication thread读取binlog event,并且检查binlog position是否在搜索树中,然后根据binlog position在搜索树中的位置,设置额外的头字节。

Semisync_master.cc:

int ActiveTranx::insert_tranx_node(const char*log_file_name, my_off_t log_file_pos)

{

…………………………

strncpy(ins_node->log_name_, log_file_name, FN_REFLEN-1);

ins_node->log_name_[FN_REFLEN-1] = 0;

ins_node->log_pos_ = log_file_pos;

}

mysql复制(半同步)//读取slave返回的信息获取binlog的pos,用来确定slave读取事务是否已经完成。

intrepl_semi_report_binlog_update(Binlog_storage_param *param,

const char *log_file,

my_off_t log_pos, uint32 flags)

{

………………….

//为了存储log_file,log_pos,这样的目的是为了确定binlog还需要多长时间才能复制到slave

error=repl_semisync.writeTranxInBinlog(log_file, log_pos);

…………………

}

int ReplSemiSyncMaster::writeTranxInBinlog(const char*log_file_name,

my_off_t log_file_pos)

{

……….

lock();

//更新最新事务提交位置

if(commit_file_name_inited_)

{

int cmp =ActiveTranx::compare(log_file_name, log_file_pos,

commit_file_name_, commit_file_pos_);

…………..

//将(log_file_name,log_file_pos)插入到search tree

if (is_on())

{

assert(active_tranxs_ != NULL);

if(active_tranxs_->insert_tranx_node(log_file_name,log_file_pos))

{

………….

}

}

……….

unlock();

………

}

int repl_semi_report_commit(Trans_param *param)

{

……………………

return repl_semisync.commitTrx(binlog_name, param->log_pos);

…………………

}

CommitTrx()的作用是:如果开启了semi-sync,会让binlog-dump thread去等待slave返回的信息,如果超过了等待时间,禁用semi-sync,通知其他事务都无需等待返回信息。

int ReplSemiSyncMaster::commitTrx(const char*trx_wait_binlog_name,

my_off_t trx_wait_binlog_pos)

//开始调用binlog_dump_thread,首先判断是否有新的semi-sync slave,如果有的话添加到semi-sync master的slave表中。判断完成后调用reportReplyBinlog,以确定该slave是否已经接收到所有的binlog event。

int repl_semi_binlog_dump_start(Binlog_transmit_param*param,

const char *log_file,

my_off_t log_pos)

{

bool semi_sync_slave= repl_semisync.is_semi_sync_slave();

if(semi_sync_slave)

{

repl_semisync.add_slave();

repl_semisync.reportReplyBinlog(param->server_id,log_file, log_pos);

}

…………………..

}

intreportReplyBinlog(uint32 server_id, const char* log_file_name,

my_off_t end_offset);

//发送binlog中的event到semi-sync slave,前面已经讲过,semi synchronous replication主要就是通过发送packet到各个slave上,用来匹配判断一个slave是否是semi-sync slave。

intrepl_semi_before_send_event(Binlog_transmit_param *param,

unsigned char*packet, unsigned long len,

const char*log_file, my_off_t log_pos)

{

return repl_semisync.updateSyncHeader(packet, log_file, log_pos, param->server_id);

}

int updateSyncHeader(unsignedchar *packet,

const char *log_file_name,

my_off_t log_file_pos,

uint32 server_id)

{

……………..

if (sync)

{

(packet)[2] =kPacketFlagSync; //即为在binlog_flag 中添加的一个byte

}

……………

}

//在发送event完之后等待,调用readSlaveReply读取semi-sync slave返回的信息

int repl_semi_after_send_event(Binlog_transmit_param*param,

const char*event_buf, unsigned long len)

{

if(repl_semisync.is_semi_sync_slave())

{

THD *thd=current_thd;

(void) repl_semisync.readSlaveReply(&thd->net,param->server_id, event_buf);

thd->clear_error();

}

return 0;

}

int ReplSemiSyncMaster::readSlaveReply(NET *net,uint32 server_id,

constchar *event_buf)

{

const char*kWho = "ReplSemiSyncMaster::readSlaveReply";

const unsignedchar *packet;

char log_file_name[FN_REFLEN];

my_off_tlog_file_pos;

…………

//计算网络等待时间

rpl_semi_sync_master_net_wait_num++;

rpl_semi_sync_master_net_wait_time += wait_time;

…………

//如果返回信息匹配成功,则调用reportReplyBinlog来返回报告信息

result = reportReplyBinlog(server_id, log_file_name,log_file_pos);

………..

return function_exit(kWho, result);

}

Slave端执行过程分析:

//调用binlog dump thread去连接master,并且读取binlog

int repl_semi_slave_request_dump(Binlog_relay_IO_param*param,

uint32 flags)

{

………

//校验master端有没安装semi-sync

query="SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";

……….

//通知master的dump thread该slave一起开启semi-sync

query="SET @rpl_semi_sync_slave= 1";

………

}

//semi-sync读取master端传递过来的binlog,调用slaveReadSyncHeader来识别传送过来的packet

int repl_semi_slave_read_event(Binlog_relay_IO_param*param,

const char *packet, unsigned long len,

const char **event_buf, unsigned long*event_len)

{

if(rpl_semi_sync_slave_status)

return repl_semisync.slaveReadSyncHeader(packet, len,

&semi_sync_need_reply,

event_buf, event_len);

*event_buf=packet;

*event_len=len;

return 0;

}

//header就是master传递过来的packet

//need_reply是判断master是否会等待slave返回信息

int ReplSemiSyncSlave::slaveReadSyncHeader(const char*header,

unsignedlong total_len,

bool *need_reply,

constchar **payload,

unsignedlong *payload_len)

{

const char*kWho = "ReplSemiSyncSlave::slaveReadSyncHeader";

int read_res =0;

function_enter(kWho);

if ((unsigned char)(header[0]) == kPacketMagicNum)

{

*need_reply =(header[1] & kPacketFlagSync);

//将header传递过来的flag与自身的kPacketFlagSync进行匹配,其中header[1]为slave设置的binlog_flag,传递给master后,进行了设置后的新值,可以在updateSyncHeader函数中找到。

………….

}

//slave执行读取的event,执行完毕后调用slaveReply返回信息给master

int repl_semi_slave_queue_event(Binlog_relay_IO_param*param,

constchar *event_buf,

unsigned long event_len,

uint32flags)

{

if(rpl_semi_sync_slave_status && semi_sync_need_reply)

{

(void) repl_semisync.slaveReply(param->mysql,

param->master_log_name,

param->master_log_pos);

}

return 0;

}

int ReplSemiSyncSlave::slaveReply(MYSQL*mysql,

const char*binlog_filename,

my_off_tbinlog_filepos)

//调用mysql->net接口返回结果信息

 

 

 

在mysql 5.5版本中,半同步实现方法和google在5.0和5.1版本当中的方式有点不同,其实现为两个plugin的方式,而且对本身代码的侵入更小,当然这是在原本的代码上进行重构的前提下

semi_sync_master 和semi_sync_slave

分别在主库和从库上注册。

 
install plugin rpl_semi_sync_master soname 'semisync_master.so';
install plugin rpl_semi_sync_slave soname 'semisync_slave.so';

实现原理:

MySQL在事务函数接口,Binlog发送接收接口,relay刷盘函数接口的实现中都提供了一个钩子。这些钩子实现为代理对象。通过向代理对象中注册观察者,可以实现在事件触发时调用观察者提供的回调函数。从而达到控制主从同步逻辑的目的。

semi-sync中有四个观察对象,主库中3个,从库中1个。

这些观察者的成员函数会在合适的时候被MySQL内部的代理对象回调。

master中三个观察者:

事务观察者,事务提交或回滚的时候回调。

 
Trans_observer{

Uint32_t  len   

Int (*after_commit)(Trans_param *param)

Int (*after_rollback)(Trans_param *param);

}

Binlog存储观察者。在binlog写盘的时候回调。
Binlog_storage_observer{

uint32_t len;

Int (*after_flush)(Binlog_storage_param *param ,const char *log_file, my_off_t log_pos, uint32 flags)   

}

Binlog dump观察者,会在binlog被发送到slave的过程中被调用。
Binlog_transmit_obaserver{

Uint32_t len;   

Int (*transmit_start)(Binlog_transmit_param *param,const char *log_file, my_off_t log_pos);

Int (*transmit_stop)(Binlog_transmit_param *param);

Int (*reserve_header)(Binlog_transmit_param *param, unsigned char *header, unsigned long size,unsignedlong *len);

Int (*before_send_event)(Binlog_transmit_param *param,unsigned char *packet, unsigned long len,const char*log_file, my_off_t log_pos );

Int (*after_send_event)(Binlog_transmit_param *param,const char *event_buf, unsigned long len);

Int (*after_reset_master)(Binlog_transmit_param *param);

}

slave中一个观察者

Relay 日志IO观察者
Binlog_relay_IO_observer{  

Uint32_t len;

int (*thread_start)(Binlog_relay_IO_param *param);

int (*thread_stop)(Binlog_relay_IO_param *param);

int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);

int (*after_read_event)(Binlog_relay_IO_param *param,const char *packet, unsigned long len, const char**event_buf, unsigned long *event_len);

int (*after_queue_event)(Binlog_relay_IO_param *param, const char *event_buf, unsigned long event_len,uint32 flags);

int (*after_reset_slave)(Binlog_relay_IO_param *param);

}

mysql内部的代理对象

mysql启动时init_server_components的过程中,会执行delegates_init()函数。该函数初始化了四个代理对象:

Trans_delegate  transaction_delegate;

Binlog_storage_delegate binlog_storage_delegate;

Binlog_transmit_delegate binlog_transmit_delegate;

Binlog_relay_IO_delegate binlog_relay_io_delegate;

这四个代理类都是Delegeate类的子类。在Delegate中有一个在其中注册的观察者列表。

typedef List<Observer_info> Observer_info_list;

所有对该代理中发生的动作感兴趣的观察者均可以通过add_observer(void *observer, st_plugin_int *plugin)函数来向代理注册。

当事件发生时,代理将循环调用在其上面注册的观察者提供的回调函数。

例如transaction_delegate代理对象在一个事务提交时被调用,则其会调用如下一个宏,唤醒所有的观察者进行相应的动作。

FOREACH_OBSERVER(ret, after_commit, thd, (&param));

mysql的代码中,通过一个宏来调用这四个代理,该宏定义如下

 



#define RUN_HOOK(group, hook, args) \ 
(group ##_delegate->is_empty() ? \ 
0 : group ##_delegate->hook args)

例如在存储引擎接口的父类handler当中,ha_commit_trans接口函数的最末尾发生如下一个调用。

RUN_HOOK(transaction, after_commit, (thd, FALSE));

该宏展开之后:

 


transaction_delegate->is_empty()? 0 : transaction_delegate->after_commit(thd,FALSE);

如果在transaction_delegate当中注册有观察者。则都会响应after_commit函数。

复制同步过程

有了以上这些接口提供的功能,要实现主从复制的同步,经如下流程。

  • 主库事务提交时在提交前的最后一刻,进行如下调用。

    RUN_HOOK(transaction, after_commit, (thd, FALSE));

该函数调用会导致主库SQL线程处于"Waiting for semi-sync ACK from slave",状态,等 待binlog event发送到从库之后得到的响应。

RUN_HOOK调用的事务观察代理最终会调用的是Trans_observer观察者的after_commit方法,该函数的参数Trans_param当中包含了当前事务提交的binlog_name和binlog_pos。该after_commit将binlog位置和从从库获得的binlog位置进行对比,如果从库已经对该binlog或该binlog之后的position发送了确认包。则提交该事务,否则事务将等待。

主库当中的binlog dump线程保持着reply_file_name和reply_file_pos两个变量来记录从库当中记录的最新的确认包。在reply_file_name.reply_file_pos之前的binlog都已经被从库接收并同步到磁盘。

replay_file_namereply_file_pos两个变量,则由Binlog_transmit_delegate代理调用Binlog_transmit_obaserver观察者的回调函数after_send_event来维护。即在接收到新的确认包之后就对这两个变量进行更新。

  • 在从库当中的binlog_relay_io_delegate代理负责调用relay_io_observer观察者来响应写relay日志的操作。在binlog写入从库当中的relay日志之后将向主库发送响应。响应回复当中包含了三个字段。这三个字段标志了从库当前写入的这个event对应主库当中的binlog的文件名和pos

REPLY_MAGIC_NUM

REPLY_BINLOG_POS_

REPLY_BINLOG_NAME

install plugin的过程中,向主库的三个代理对象注册三个监听者,分别监听事务提交事件,binlog写盘事件和binlog发送事件。

在从库的plugin加载的过程中,向一个代理对象注册一个监听者,监听relay的写盘事