第9章 MySQL高可用实现

生成WriteSet 元素。这样,在有其他的事务操作父表的外键字段的时候,会检测出冲突,在从库就不能进行并行执行。

至此,MySQL 的并行复制功能介绍已完结。可以观察到,从5.6 版本至8.0 版本,MySQL 不断对并行复制功能进行优化。在MySQL 8.0 版本中,基于WriteSet 的方法已经基本能够解决主从复制的问题。然而,在实际应用中,主从复制延迟的原因比较多样化,例如从库所在机器性能不足等。在这些情况下,即便采用基于WriteSet 的并行复制技术,仍然可能出现延迟。

9.2 组复制

在我们使用或者维护MySQL 的时候,经常会遇到两个棘手的问题:

  • 高可用。在MGR 出现之前,MySQL 没有提供自动切换的方案,这就需要数据库维护人员部署开源高可用方案或者自行开发。无论何种方案,要想稳定运行都需要经过各种场景的长期验证。
  • 数据一致性。在本章前面介绍主从复制及半同步复制的时候,我们知道MySQL 默认的异步复制是存在数据丢失的风险的,即使是半同步复制,在极端情况下还是会存在数据丢失的问题。

为了解决上述问题,在MySQL 5.7 版本中引入了MGR,基于一致性协议Paxos 实现底层数据同步,在协议上保证数据的一致性,并且提供了节点自动故障转移功能。同时,MGR 还支持单主模式和多主模式,这就意味着应用可以在多个节点上同时写入。下面介绍MGR 是如何实现这些功能的。

9.2.1 总体架构

MGR 是以插件方式实现的,它跟半同步插件类似,最终通过回调函数来调用对应的方法。不过MGR 的实现较为复杂,主要原因是它在底层实现了Paxos 协议的变种,不仅要实现数据流和控制流,跟上层MySQL Server 交互,还需要处理多种异常情况。MySQL MGR 的主要结构,如图9-8 所示。

MGR 插件主要包含4 层:插件管理相关、GCS(Group Communication System)层、GCS XCOM Proxy层、XCOM层。其中插件管理相关包括插件初始化、启动、UDF 函数、相关参数设置等功能模块,由于篇幅原因在图9-8 中未体现。下面我们具体介绍GCS 层、GCS XCOM Proxy 层、XCOM 层。

图9-8 说明

图9-8 展示了 MySQL MGR 的主要结构,包含 GCS 层、GCS XCOM Proxy 层和 XCOM 层,以及各层中的核心模块与线程。由于无法直接渲染图片,以下用 Mermaid 图简要示意其层次关系(非完整细节):

graph TB
    subgraph GCS层
        GCS_Module[GCS 模块]
        message_service[message service]
        autorejoin[autorejoin_thread_handle]
        session_handler[session_thread_handler sql]
        consumer[consumer_function]
        clone[clone_thread_handle]
        broadcast[launch_broadcast_thread]
        partition[partition_thread_handler]
        execute_action[execute_group_action_handler]
        applier[applier_thread_handle]
        recovery[recovery_thread_handle]
        primary_election[primary_election_process_handler]
        secondary_election[secondary_election_process_handler]
        incoming[incoming]
    end
    subgraph GCS XCOM Proxy层
        suspicions[suspicions_processing_thread]
        process_notif[process_notification_thread]
        m_xcom_input[m_xcom_input_queue]
        m_notification[m_notification_queue]
    end
    subgraph XCOM层
        xcom_taskmain2
        local_server
        proposer_task
        acceptor_learner_task
        executor_task
        tcp_server
        cache_manager_task
        sender_task
        local_sender_task
        terminator_task
        sweeper_task
        alive_task
        reply_handler_task
        tcp_reaper_task
        detector_task
        prop_input_queue
        outgoing_channel[outgoing channel]
        global_state_machine[全局状态机哈希表]
    end
    GCS_Module --> m_xcom_input
    m_xcom_input --> local_server
    local_server --> prop_input_queue
    prop_input_queue --> proposer_task
    proposer_task --> outgoing_channel
    outgoing_channel --> sender_task
    sender_task --> tcp_server
    tcp_server --> acceptor_learner_task
    acceptor_learner_task --> executor_task

详细模块功能请参考后续文字描述。

1. GCS 层

GCS 层包含GCS 模块、应用模块、选举切换模块等,主要负责跟上层MySQL Server 以及下层XCOM 模块的交互,具体包括:

  • message service。该模块启动 launch_message_service_handler_thread 线程,主要用于在插件内跟MySQL Server 层完成消息交互。
  • autorejoin_thread_handle。该模块会开启 autorejoin_thread_handle 线程,在节点异常宕机或者重启时会根据情况重新加入集群。
  • session_thread_handler sql。该模块会开启 session_thread_handler 线程,主要负责执行一些内部命令,例如设置只读、杀掉会话、等待GTID 执行完成等。
  • consumer_function。由 consumer_function 线程负责,主要负责将打印日志输出到MySQL 错误日志文件中,日志模块还实现了异步写入。详细可参考 plugin/group_replication/libmysqlgcs/src/interface/gcs_logging_system.cc 文件。
  • clone_thread_handle。该模块会开启 clone_thread_handle 线程,主要用于在集群创建的时候自动同步存量的数据。在克隆特性引入之前,需要通过 mysqldumpxtrabackup 等方式将存量数据导入到其他实例中。
  • launch_broadcast_thread。主要负责两部分工作,一部分是进行事务冲突检测,由应用线程触发,另外一部分是开启 launch_broadcast_thread 线程,该线程定期开启流控和广播已经执行的GTID 信息到集群中。
  • partition_thread_handler。处理网络分区的情况。
  • execute_group_action_handler。主要处理单主转换为多主、主动切主等消息。
  • applier_thread_handle。应用线程,主要处理数据流和控制流的消息。例如,MySQL 层的事务最终就是由该线程进行应用的。
  • recovery_thread_handle。该模块会开启 recovery_thread_handle 线程,当节点发生切换后,或者加入到集群的时候,会根据情况看是否需要进行数据恢复补齐。
  • primary_election_process_handler。在集群切换的时候,负责处理领导者切换相关动作,例如等待事务应用完成、退出只读模式等。
  • secondary_election_process_handler。在集群切换的时候,负责处理跟随者切换相关动作,例如设置只读模式、等待事务应用完成等。
  • incoming。用于数据流和控制消息的传输,主要由 applier_thread_handle 线程进行消费处理。
  • GCS 模块。从MySQL Server 层接收消息,进行处理后发送到XCOM 层,同时从XCOM 层接收消息进行处理。这些消息包括数据流消息、控制流相关消息,例如节点加入和移除等。详细可参考 plugin/group_replication/src/gcs_event_handlers.cc 文件。

2. GCS XCOM Proxy 层

GCS XCOM Proxy 层的主要作用是衔接GCS 层和XCOM 层,里面定义了很多接口和方法,限于篇幅,这里不会详细介绍,感兴趣的读者可以参考 plugin/group_replication/libmysqlgcs/src/bindings/xcom/ 下的文件。这里主要介绍其开启的两个比较重要的线程以及对应的两个队列,在图9-8 中从左到右依次为:

  • suspicions_processing_thread。处理有问题的节点,节点如果超时就从本地节点列表中移除节点,如果没有超时就检查是否有丢失消息,有丢失消息就设置丢失消息标志位并且打印warning 日志。
  • m_xcom_input_queue 队列。用于GCS 层发送控制流消息和数据流消息给XCOM 层,主要是用户线程发送事务消息到该队列,local_server 任务从该队列进行消费处理。
  • m_notification_queue 队列。该队列用于传递一些节点管理的对象消息以及事务处理的回调对象消息。由 process_notification_thread 线程消费处理。
  • process_notification_thread。主要用于节点管理相关消息的传递,例如节点添加、节点删除等。

3. XCOM 层

XCOM 层实现Paxos 协议的变种,其中包含Paxos 通信、消息传输、成员变更等功能,主要调度的是一些处理Paxos 协议消息交互的任务。

MGR 插件的底层就是XCOM 层,在这之前MySQL 使用的是Corosync 组件实现,在后期采用全部自研。XCOM 层主要包含 xcom_taskmain2 模块,它开启了一个线程,并在该线程中实现了一个协程机制,定期调度不同的任务。表9-9 所示为 xcom_taskmain2 调度的常见任务。

任务名称描述
local_server用于获取本地GCS 层发来的消息,然后进行处理
proposer_task负责发送消息给本地节点和远程节点,默认有10 个 proposer_task 任务
acceptor_learner_task读取 tcp_server 接收到的数据进行处理,也就是处理其他节点发送过来的Paxos 消息
executor_task读取Paxos 协议层的消息,然后执行已经到learn 阶段的消息
tcp_server用于监听套接字端口,并且创建对应的任务进行处理
cache_manager_task维护状态机缓冲,每隔0.1s 就检查是否扩容或者收缩状态机缓存
sender_task接收本地 proposer_task 发来的消息,然后发送给远程节点
local_sender_task接收本地 proposer_task 发来的消息,然后本地进行处理
terminator_task用于延迟终止xcom 实例
sweeper_task用来跳过noop 消息,在后续会详细介绍
alive_task当节点空闲超过0.5s 的时候,会每秒定时向其他节点发送 i_am_alive_op 心跳信息,当发现某个节点超过4s 没有回复的时候,会发送 are_you_alive_op 类型的心跳消息
reply_handler_taskacceptor_learner_task 的逻辑相似,不过它只监听用于发送消息的套接字,所以它主要用于处理发送消息后的回复,一般用在最开始协议协商的阶段
tcp_reaper_task负责清理长时间空闲的连接
detector_task定期向其他节点发送view 消息

另外,跟上述任务协作的还有两个队列和一个哈希表:

  • prop_input_queue 队列。用于数据流消息和控制流消息的传输,由 local_server 任务生产,由 proposer_task 任务进行消费处理。
  • outgoing channel 队列。主要用于将消息传递给 sender_task 任务,在MGR 中有多个outgoing 队列,每个队列对应一个 sender_task 任务,最终由 sender_task 任务将消息发送给其他节点。主要由 proposer_task 任务生产。
  • 全局状态机哈希表。用户缓存全局Paxos 状态机。

至此,MGR 插件的主要模块介绍完毕。可以看到,MGR 插件的实现较为复杂,各个线程分工明确,不同的模块负责的工作不同,并且模块之间还会有交互。下面介绍数据流,以及这些模块是如何协作运行起来的。

9.2.2 数据流

数据流分为四层:

  1. MySQL Server 层。在事务提交的之前会调用回调函数将事务数据发送到GCS 层。
  2. GCS 层。将事务数据转换成GCS 层对应的对象。
  3. GCS XCOM Proxy 层。将GCS 对应的事务对象进行数据序列化,转换成Paxos 消息需要的格式。
  4. XCOM 层。这层逻辑最为复杂,是实现Paxos 协议的关键地方。

在详细介绍数据流之前,我们需要大概了解一下Paxos 协议。Paxos 协议是一个分布式强一致协议,相对比较复杂,由于篇幅原因,这里只介绍相关概念以及大概流程,方便后续章节的理解。首先看看常见的数据库集群方案:

  • MySQL 异步复制。MySQL 默认的同步策略为异步复制,在写入量较大的时候从库会发生延迟,这个时候主库宕机可能造成从库数据未追平,从而导致数据丢失。
  • MySQL 半同步复制。半同步复制是以MySQL 插件的方式提供的,它的核心思想是保证主库的事务在提交前要写入从库的中继日志中,这样就解决了异步复制的数据丢失问题。不过,默认情况下在主从延迟过大的时候会退化到异步复制下,这样跟异步复制一样会造成数据丢失。当然,如果强制设置不进行退化就可以保证数据一致性,这里其实就可以理解为同步复制了,同步复制会牺牲可用性,如果从库宕机,主库将无法写入。
  • 多数派读写。在集群有领导者且单点写入的时候,这个时候是可以保证强一致的,例如M例如MongoDB 在Primary 写入,设置Write Concern 策略为Majority,保证写入到大部分节点。如果集群多个节点可写入,就可能会造成数据不一致。例如早期版本的Cassandra,它只能保证线性一致,如果多个节点操作同一条数据就可能造成数据不一致。不过Cassandra 在2.0 版本的时候基于Paxos 协议实现了分布式强一致。

大部分主流数据库都未实现分布式强一致,在一些极端的场景下可能会发生数据丢失等情况。根据上述集群方案,可以总结表述为,一个可用性较高的分布式强一致协议需要具备以下特性:

  • ❑ 可用性保证,至少需要三个节点。
  • ❑ 强一致性保证,需要是多数派的写入。
  • ❑ 分布式特性,能提供多节点写入。

上述三个特性是分布式强一致协议最基础也是最重要的特性。可以看到,要实现分布式强一致协议,其实在多数派读写上进一步优化即可。多数派写入的核心问题是在多个节点同时操作一样的数据时可能造成不一致。那么我们尝试解决这个问题。

多个节点同时操作一条数据有问题,其核心是无法保证顺序性。如果把操作的数据加上版本信息?在写入之前先读取数据的版本信息,然后在写入的时候确认版本信息没有变化再进行写入。这种思想跟CAS(Check-And-Set 或Compare-And-Swap)协议相似,在Memcache 中就采用该协议解决了并发操作同一个键的问题。但这里有一个问题,Memcache 是单节点,获取版本和写入都只用考虑单个节点。下面举例来看看多节点的分布式强一致性协议基于这种思想会有什么问题。

假设有A、B、C 三个节点,有2 个客户端同时操作一个名为a 的键,客户端1 执行a=1,客户端2 执行a=2。如果它们同时执行,那么都需要进行多数派读来确定版本信息,因为同时执行时它们拿到的版本信息是一致的。这个时候它们再同时写入,会发生什么?如果客户端1 在A 节点上执行,客户端2 在C 节点上执行。最终可能有如下3 种情况:

  • ❑ 客户端1 执行成功,客户端2 执行失败。这种情况是在客户端2 执行的时候,客户端1 已经执行完成并且同步给所有节点了。客户端2 再执行的时候发现版本信息不对,就退出执行。
  • ❑ 客户端1 执行失败,客户端2 执行成功。这种情况跟上面一样,就是客户端2 比客户端1 快。
  • ❑ 客户端1 执行成功,已同步到大部分节点,客户端2 执行异常。客户端1 在A 和B 上执行成功,客户端2 在C 上执行成功。这个时候无论客户端1 的数据同步到C 还是客户端2 的数据同步到A 或B 都会发生冲突。

可以看到,在数据有版本的时候,可以在一定程度上解决多节点操作同一条数据的问题,不过并不能完全解决,在最后一种情况下是有问题的。这个时候就需要相关的处理策略,主要思想是让客户端1 和客户端2 要有全局的顺序性,如果发生了冲突则需要有对应的策略进行处理。这也接近了Paxos 协议的思想,下面就来简单看看Paxos 协议是如何实现的。

先来介绍Paxos 协议的相关概念:

  • client(客户端)。发送请求给分布式集群。
  • proposer(提议者)。它接收客户端的请求,将请求发送给接受者,同时它也是一个提议者,处理数据发生冲突等情况。
  • acceptor(接受者)。用于接收提议者发来的请求。
  • learner(学习者)。当提议者发送的请求在所有接受者中达到多数派标准之后,最终将请求发送给学习者节点,由学习者节点负责执行,不过一般情况下学习者和接受者为同一个节点。
  • leader(领导者)。这里的领导者也是一个提议者,是所有节点选举出来的。
  • proposal number and agreed value(提议号和商定值)。在每次提议者发送请求的时候,都会携带一个唯一的proposal number 还有需要同步的数据,也就是agreed value。接受者收到之后会比较发送过来的proposal number 和本地的谁大,如果发送过来的较大则接收请求,然后将agreed value 保存在本地,如果较小则拒绝请求并不做回应。
  • quorum(多数派)。在提议者发送请求后,需要多数派的接受者做出回复才能进入下一阶段。
  • state machine(状态机)。状态机是一种数据模型,在Paxos 中,一系列有序的Paxos 日志组成了Paxos 状态机。不同的节点要达到相同的状态,需要借助状态机进行相同顺序的执行。

知道了Paxos 协议的相关概念后,我们来介绍Paxos 协议的执行流程。在最基础的Paxos 协议中,客户端发送过来的请求会经历两个阶段:准备阶段和接受阶段。

先来介绍准备阶段。提议者接收客户端的请求,然后发送准备消息,其实就是发送带有proposal number 的消息给所有的接受者,注意这里不携带对应的值。接受者收到后,会对比发送过来的proposal number 和最近收到的proposal number。如果发送过来的较大,则接收请求,然后将发送过来的proposal number 保存在接受者中,后续将忽略所有小于该proposal number 的请求。如果在这之前接受者已经接收过其他提议者发来的值了,则返回接受的提议者对应的proposal number 和值,如果没有接受过则返回promise 消息即可。如果小于或者等于最近收到的proposal number,则忽略这个请求,不做任何回复。针对这种情况,这里有个优化,就是接受者可以回复拒绝消息,提议者收到后就可以及时终止本轮请求。

这里有个问题,如何判断接受者已经接收过值了?其实Paxos 的维度是针对该轮请求来判断,例如客户端1 和客户端2 分别从A 节点和C 节点发起请求,它们都在同一轮发起请求。那这个时候它们肯定会产生冲突,注意这里的冲突并不是发送的消息内容冲突,而是Paxos 协议顺序性的冲突。为了解决该冲突就有上述的判断,判断proposal number 的大小和接受者是否在本轮已经接收过值了,注意已经接收过值说明某个客户端的请求已经到达了第二阶段,因为第一阶段的请求是不带值的。所以一般Paxos 的实现内部都会创建多个状态机对象,里面保存了Paxos 消息内容,各个阶段情况,他们以消息编号作为唯一确定的值,通过消息编号就能找到对应的状态机对象。在状态机对象中保存每一阶段的值和proposal number,这样就可以进行判断了。

在这里又引入了两个问题:①Paxos 协议层解决的是其协议请求执行顺序的冲突,那上层应用的数据冲突谁来解决呢?例如MGR 的场景在两个节点写入可能会造成事务冲突,MGR 则实现事务认证、冲突模块来解决,在后续章节会详细介绍。②Paxos 协议层冲突的核心原因是请求发送都在同一轮里面,正常如果只有一个提议者发送请求,其实是没有冲突情况的,这个时候是不是可以省略第一阶段。如果是多个提议者我们能不能让它们在发起请求的时候就不在同一轮里面呢,这样它们是不是也不会冲突。这里说的其实就是Paxos 的multi paxos 的优化思想,MGR 已经实现,在后续的章节中会详细介绍。

再来介绍接受阶段。当准备阶段提议者收到多数派接受者节点回复的时候,表示请求可以继续,这个时候提议者再次向所有的接受者节点发送请求,请求内容包括proposal number 和对应的值,注意这次需要携带值,这里发送的proposal number 和值可能是自己的也可能是其他客户端的,在准备阶段的时候接受者如果已经接受了其他提议者的值,这时候给提议者返回的就是它接受的proposal number 和对应的值,提议者收到之后需要将该proposal number 和对应值发送给所有的接受者,这里其实是一种修复,之前的客户端可能由于各种异常最终没有完成整个请求,那么这种情况下其他提议者收到之后就协助其完成整个请求。同样接受者在收到请求的时候,会对比发送过来的proposal number 和最近收到的proposal number,如果较大或者等于则将值保存到本节点上,然后发送返回的请求给提议者节点表示已经接受消息。如果较小,则忽略该请求,不做任何回复。

下面我们举例说明两阶段情况。第一种情况的Paxos 两阶段流程如图9-9 所示,客户端1 首先向A 节点发送请求,此时A 为本次请求的提议者,提议者向其他接受者(A、B、C 节点)发送请求,注意A 也是接受者,proposal number 为1,slot 为1,这里的槽可以表示消息号,也是确定当前轮次的唯一值。由于在第一个槽期间,三个节点都没有接收请求,所以本次请求的proposal number 都大于接受者节点的proposal number。所以所有的接受者都接受了请求。此时所有的接受者会把接收到的proposal number 保存到本节点上。

图9-9 第一种情况的Paxos 两阶段流程

图9-9 展示了客户端1 和客户端2 在相同槽(slot=1)上分别发起请求时的两阶段流程。由于客户端1 先发起且proposal number=1,客户端2 发起时proposal number=2,导致客户端1 的第二阶段被拒绝,客户端2 的第二阶段成功,最终值被设置为 set x=2。原文中提供了详细的箭头标注,因缺少图片,此处概括逻辑。

然后客户端2 向C 节点发送请求,此时C 节点为本次请求的提议者,提议者向其他接受者(A、B、C 节点)发送请求,注意C 节点也是接受者,proposal number 为2,slot 为1。由于在槽1 期间所有的接受者已经接受了客户端1 的请求,所以对应的proposal number 为1,由于本轮请求的proposal number 为2,所以所有的接受者会接受请求。最终所有接受者会把接收到的proposal number 保存到本节点上。

此时客户端1 的请求进入第二阶段,由提议者(A 节点)发送请求给所有的接受者(A、B、C 节点)。请求的内容为(proposal number:1,value:set x=1)。当所有的接受者接受到这个请求时,对比发现提议者发送过来的proposal number 小于自己的,这时候接受者会忽略该请求。

然后客户端2 的请求进入第二阶段,由提议者(C 节点)发送请求给所有的接受者(A、B、C 节点)。请求的内容为(proposal number:2,value:set x=2)。当所有的接受者接受到这个请求时,对比提议者发送过来的proposal number 跟自己相等,这个时候接受者会接受该请求,然后对应的值保存到本节点上。

上述例子比较简单,下面再看看第二种情况,第二种情况的Paxos 两阶段流程如图9-10 所示。

图9-10 第二种情况的Paxos 两阶段流程

图9-10 展示了一个更复杂的场景,其中客户端1 的提议者A 先发起第一阶段(proposal number=1),但客户端2 的提议者C 随后发起更高编号(proposal number=2)的第一阶段,接着又发起更高编号(proposal number=3)的第一阶段,最终客户端2 的phase2 用proposal number=3 设置值 set x=2,而客户端1 的phase2 因proposal number=1 过小被忽略。图中标注了多次proposal number 的变化。详细步骤请参照文字描述。

(由于原文在图9-10 之后还有一段文字,但根据停顿点,当前部分应结束于此处。继续后续内容:下一小节9.2.3 Paxos优化)

9.2 第9章 MySQL组复制与总结

图9-10 第二种情况的Paxos两阶段流程

MySQL内核设计与实现

首先客户端1 向A 节点发送请求,此时A 节点为本次请求的提议者,提议者向其他接受者(A、B、C 节点)发送请求,proposal number 为1,slot 为1。由于在第1 个slot 期间,三个节点都没有接受请求,所以本次请求的proposal number 都大于接受者节点的proposal number。所以所有的接受者都接受了请求。此时所有的接受者会把接收到的proposal number 保存到本节点上。

然后客户端2 向C 节点发送请求,此时C 节点为本次请求的提议者,提议者向其他接受者(A、B、C 节点)发送请求,proposal number 为2,slot 为1。由于在第1 个slot 期间所有的接受者已经接受了客户端1 的请求,所以对应的proposal number 为1,由于本轮请求的proposal number 为2,所以所有的接受者会接受请求。最终B 节点接受者已经把接收到的proposal number 保存到本节点上,A 节点还未收到请求。

此时客户端1 的请求进入第二阶段,由提议者(A 节点)发送请求给所有的接受者(A、B、C 节点)。请求的内容为(proposal number:1,value:set x=1)。当所有的接受者接受到这个请求时,对比发现提议者发送过来的proposal number,此时节点B 和C 会忽略该请求。

然后客户端2 的请求进入第二阶段,由提议者(C 节点)发送请求给所有的接受者(A、B、C 节点)。请求的内容为(proposal number:2,value:set x=2)。当所有的接受者接受到这个请求时,对比发现提议者发送过来的proposal number,这个时候B 节点接受者已经接受该请求,A 节点还未收到请求。

然后客户端1 向A 节点再次发送请求,此时A 为本次请求的提议者,提议者向其他接受者(A、B、C 节点)发送请求,proposal number 为3,slot 为1。当所有的接受者接受到这个请求时,对比发现提议者发送过来的proposal number。接受者的proposal number 都比3 小,都接受了提议者请求。不过接受者在slot1 任期里面已经接受过值了,这个时候会将已经接受的proposal number(2)和对应的值(set x=2)返回给提议者。

此时客户端1 的请求进入第二阶段,由提议者(A 节点)发送请求给所有的接受者(A、B、C 节点)。请求的内容为(proposal number:3,value:set x=2)。当所有的接受者接受到这个请求时,对比发现提议者发送过来的proposal number,所有接受者值都相等,这个是把提议者发送过来的值(set x=2)保存在本节点中。

上述情况比较特殊,一般在C 节点跟A 节点通信有延迟等情况下产生。这也很好地模拟网络分区时Paxos 请求发生冲突的处理,最终可以看到冲突处理其实以谁最先完成第二阶段为主,未完成的节点后续也是对已完成节点的一个修复工作,最终保证在所有的节点上执行成功。

其实在Paxos 协议中还有一个阶段是将第二阶段完成的请求再发送给学习者节点,一般情况下学习者节点跟接受者是同一个节点,在MGR 中也是这样的,最终到了learn(学习)阶段后就可以进行应用执行了。

至此Paxos 协议已经介绍完毕,我们可以看到Paxos 相比之前我们提到的多数派读写,在同一轮或者同一任期中,它提供了proposal number 来控制执行顺序。又通过一些策略来控制冲突,比如在上述例子中最终冲突了则选择已经完成两阶段的请求,其他冲突的请求则失败,并且为成功的请求做修复。

简单了解了Paxos 协议后,我们就能更好地理解MGR 内部的数据执行流程了。在MGR 中,数据流比较复杂,它不仅涉及Paxos 协议,还需要跟上层MySQL 交互,领导者端处理事务提交流程,跟随者端处理事务应用的流程。并且在多主等情况下还需要处理数据冲突的情况。MySQL MGR 数据流的架构如图9-11 所示。

由于篇幅问题,图9-11 中只展示了一个领导者和跟随者节点,实际场景中会有多个跟随者或者多个领导者。接下来分模块进行详细介绍。

图9-11 MySQL MGR数据流架构

(详细文字描述见下节)

(1)MySQL层(领导者)

在领导者端的MySQL 层,客户端发起请求到MySQL 层,MySQL 层进行处理,最终进行事务提交。在正常的流程中,事务提交就会调用存储引擎的接口,随即完成提交。不过在MGR 中,在进行存储引擎层的提交之前会调用回调函数,回调函数会调用MGR 插件的底层方法对事务数据进行处理,然后通过Paxos 协议发送到其他的跟随者中。图9-11 左上角中的详细步骤如下:

1)提交第1 步,用户线程开始进行事务提交,如果开启了MGR 插件,则会调用对应的回调函数,也就是group_replication_trans_before_commit 方法。

2)提交第2 步,回调方法会将事务数据,也就是用户线程中的binlog 缓存数据发送到GCS 层,然后会加锁进行等待,等待MGR 进行事务数据的处理和发送到其他跟随者节点。

3)提交第3 步,等待MGR 处理完成之后,会将锁释放。

4)提交第4 步,释放锁之后继续进行事务提交流程,进行后续的存储引擎层提交。

(2)MGR GCS层(领导者)

GCS 层在收到上层MySQL 发来的binlog 缓存数据后,开始进行处理,主要就是生成MGR 需要的事务数据,然后最终序列化成Paxos 的消息。主要经历如下几个阶段:

1)生成Transaction_msg 对象数据,它的内部结构如图9-12 所示。

图9-12 Transaction_msg的内部结构

可以看到,Transaction_msg 分为以下三部分:

  • transaction context event。这是在GCS 生成的事件,是MGR 新增的事件类型,它主要存储server uuid、线程ID、GTID 集合,以及后续用于冲突检测的WriteSet,其生成方式跟之前介绍的基于WriteSet 并行复制的逻辑一致。

  • GTID_EVENT。这里的GTID_EVENT 也在GCS 层生成,因为事务的GTID_EVENT是在InnoDB 存储引擎层生成的,这里回调函数是在之前调用的,所以binlog 缓存中没有GTID_EVENT,需要MGR 自己生成。注意这里生成的GTID 信息其实是没有gno 的,gno 会在后续申请,在9.2.4 小节中会详细介绍。

314
MySQL内核设计与实现

  • binlog 缓存。由上层MySQL 传递下来,里面包含整个事务产生的binlog 事件,例如QUERY_EVENT、TABLE_MAP_EVENT、ROWS_EVENT、XID_EVENT 等,根据不同的事务,最终binlog 缓存中包含的事件也不一样。

最终,Transaction_msg 其实就是申请了一块内存,将transaction context event、GTID_EVENT、binlog 缓存复制到内存中。

2)转换成Gcs_message 对象。到了GCS 层后,事务消息需要封装成GCS 层对应的Gcs_message 对象,Gcs_message 的内部结构如图9-13 所示。

图9-13 Gcs_message的内部结构

可以看到,Gcs_message 同样分为三部分,内容如下:

  • Gcs_member_identifier,是存储当前节点在集群中的member id,主要标识消息的发送来源。

316
MySQL内核设计与实现

  • Gcs_group_identifier,存储集群group id 信息,标识消息属于哪一个集群。

  • Gcs_message_data,存储之前生成的Transaction_msg 数据,不过这里进行了简单的序列化,例如上图所示,Transaction_msg 存储在payload 字段中,在之前还存储了一些头信息。

3)转换成Gcs_packet 对象,从而将数据组装成更好的形式发送到网络缓冲区中。Gcs_packet 的内部结构如图9-14 所示。

图9-14 Gcs_packet的内部结构

可以看到,Gcs_packet 分为以下四部分:

  • fixed header,用于存储一些头部信息,主要是记录协议版本的信息、消息类型等。

  • dynamic headers,在各个阶段生成的动态头信息,主要是用于压缩和分裂相关阶段。

  • stage metadata,存储在各个节点的一些元数据信息。

  • payload,存储Gcs_packet 中的Gcs_message_data 数据,这里面包含了Transaction_msg 的数据。

4)转换成app_data 对象。最终在发送到XCOM 层的时候,需要封装成app_data 对象,这是MGR 中Paxos 消息对象中的一个元素,在XCOM 中收到GCS 发来的app_data 对象会新建一个Paxos 消息对象,然后将app_data 保存在Paxos 消息对象中。这里将Gcs_packet 的所有部分都序列化成对应的字符数组,最终存储到app_data 中。app_data 中的相关字段比较多,就不一一介绍了。

经过以上一系列的转换和序列化,最终将binlog 缓存的内容序列化存储在app_data 对象中,然后将app_data 对象发送到m_xcom_input_queue 队列,接着发送到XCOM 层。

(3)MGR XCOM层(领导者/跟随者)

在XCOM 层中,xcom_taskmain2 通过调度local_server 消费m_xcom_input_queue 队列中的消息,拿到app_data 对象后生成Paxos 消息对象。Paxos 消息对象的主要内容如下:

node_no to            // 消息发送的目标端.
node_no from          // 消息发送的源端.
uint32_t group_id     // 消息所属的复制组.
synode_no max_synode  // 当前节点最大的msgno.
start_t start_type    // 消息所属的阶段,启动或者恢复阶段.
ballot reply_to       // 返回的paxos 消息的proposal number.
ballot proposal       // 发送的paxos 消息的proposal number.
pax_op op             // 定义paxos 操作类型,主要是几个阶段的操作消息,例如:prepare_op、ack_prepare_op、accept_op、ack_accept_op 等.
synode_no synode      // 定义message number.
pax_msg_type msg_type // 定义消息类型,主要是有normal、no_op、multi_no_op 三种消息类型.
bit_set *receivers   // 用位图保存该消息所有的接受者,通过该字段可以来判断消息是否达到majority.
app_data *a          // app_data 在上述已经提到,它用于存储事务数据.
snapshot *snap       // 当op 类型为snapshot 类型,则存储snapshot 数据.
gcs_snapshot *gcs_snap    // 当op 类型为gcs_snapshot_op,则存储gcs_snapshot 数据.
client_reply_code cli_err // 返回错误码,主要有REQUEST_OK、REQUEST_FAIL、REQUEST_RETRY 三种类型.
bool_t force_delivery     // 强制投递消息,即使是上阶段消息没有达到majority.
int32_t refcnt            // 记录reference count.
synode_no delivered_msg   // 广播上一次投递的消息.
xcom_event_horizon event_horizon                 // -
synode_app_data_array requested_synode_app_data  // -

在生成Paxos 消息对象后,local_server 任务会将Paxos 消息对象发送到prop_input_queue 队列中,proposer_task 任务进行消费,这里就代表着Paxos 协议正式开始了。

前面提到最基础的Paxos 主要包含两阶段流程(准备阶段和接受阶段),在MGR 中则分别定义了三阶段和两阶段流程。三阶段有prepare、proposer、learn,两阶段有proposer、learn。那什么时候需要三阶段,什么时候需要两阶段呢?在以下几种情况需要进行三阶段:

  • 发送的消息类型是add_node_type、remove_node_type、force_config_type,也就是增删节点、改变集群配置时。
  • 调用modify_configuration 方法时,其实也是改变集群配置。

剩下的情况都是可以进行两阶段的,例如正常的数据写入。其实相比原生的Paxos 协议,这里的三阶段是与之一致的,也就是把learn 阶段也算在接受阶段内了,而这里的两阶段是省略了原生Paxos 协议的第一阶段。这里为什么可以省略呢?

其实第一阶段主要进行写前读取,第二阶段才进行真正地写入。在一些情况下是可以省略第一阶段的,比如集群只有一个节点接受客户端的请求,那么此时也就是只有一个提议者。在只有一个提议者的时候是不会发生冲突的,那么此时直接进行第二阶段不会有什么问题。那我们知道MGR 是支持单主跟多主模式的,多主模式下是可以有多个节点接受客户端的写入请求的。这个时候为什么还可以省略第一阶段呢?这里其实是MGR 对Paxos 的一个优化,在9.2.3 节中会详细介绍。

虽然三阶段处理的都是控制流的消息,这里还是以三阶段为例来介绍整个流程,方便读者有一个全局的理解,并且三阶段其实也是包含两阶段流程的。

在开始介绍之前,我们先来了解一下MGR 中的状态机对象。针对每个消息,MGR 在每个节点中都有一个状态机对象,状态机对象中保存了这个消息不同阶段的信息,具体的字段如下所示:

/* Paxos 实例定义信息 */
struct pax_machine {
  linkage hash_link;
  stack_machine *stack_link;
  lru_machine *lru;
  // 消息号
  synode_no synode;
  double last_modified; 
  linkage rv;              /* 任务可能会在此处休眠,直到发生有趣的事情. */
  // 接收到发送来的proposer 请求,相关信息会保存在这里.
  struct {
    ballot bal;            /* 我们正在处理的当前投票. */
    // bitmap,用来记录多少节点回复了请求
    bit_set *prep_nodeset; /* 已经对我的预操作请求做出回应的节点. */
    ballot sent_prop;
    bit_set *prop_nodeset; /* 已经对我的提议做出回应的节点. */
    pax_msg *msg;          /* 我们正试图推送的值. */
    ballot sent_learn;
  } proposer;
  // 接受了某个节点的消息会保存在这里.
  struct {
    ballot promise;        /* 承诺不接受任何低于此(值)的提议. */
    pax_msg *msg;          /* 我们已经接受的值. */
  } acceptor;
  // 消息到达learn 阶段会保存在这里
  struct {
    pax_msg *msg;          /* 我们已经得知的值. */
  } learner;
  int lock;
  pax_op op;
  int force_delivery;
  int enforcer;
#ifndef XCOM_STANDALONE
  char is_instrumented;
#endif
};

Paxos 协议整个生命周期都在该状态机下完成。在MGR 中还维护了一个全局的状态机缓存,它的底层是用一个哈希表存储的。可以通过消息号在全局的状态机缓存中获取到对应的状态机对象,如果获取不到则需要新创建,创建完成之后会插入该全局的状态机缓存。

前面提到的proposer_task 收到Paxos 对象消息时,会根据其消息类型来判断进行三阶段还是两阶段流程。下面以三阶段流程为例详细进行介绍,这里还是假设有A、B、C 三个节点,A 节点作为提议者发送请求,B、C 节点则处理收到A 节点的请求。

(1)prepare 阶段

本阶段对应图9-11 左下角到右下角的第1 步和第2 步,A 节点在准备阶段时会设置paxos 消息对象的阶段类型为prepare_op,设置synode、proposal、msg_type 等值。然后将消息循环发送到不同的Server 维护的outgoing 消息通道里面。在MGR 中,本地节点为集群中的每个节点都维护了一个outgoing 消息通道,包括自己的。在发送paxos 消息的时候就是将消息循环发送到这些消息通道中。

每个消息通道都对应一个sender_task 任务进行监听消费,如果是本地的消息通道则对应的是local_sender_task 任务。所以这里分为以下两种情况:

  • 发送到远端:由sender_task 负责,它消费proposer_task 发送到outgoing发送到本地。本地由local_sender_task 根据发来的消息类型进行处理,其处理方式跟其他节点一致,并且调用相同的逻辑,在后面将详细介绍。

所有节点都会运行tcp_server 任务来监听对应的网络套接字,当收到数据后会创建acceptor_learn 任务,由acceptor_learn 任务进行处理。在acceptor_learner 任务中会根据消息的阶段类型进行处理,本次收到的消息的阶段类型为prepare_op,所以这里处理prepare 消息。注意在每次处理Paxos 消息的时候都会为其创建状态机或者从状态机缓存中找到其对应的状态机。

然后A、B、C 节点开始处理prepare 消息,首先看当前节点该消息的状态机中是否已经到达learn 阶段,到learn 阶段表示该消息已经完成了。这里是通过状态机中的learn 字段是否为空来判断的。如果没有处于learn 阶段,就对比消息对象中的proposal 和状态机中的acceptor.promise 的proposal number,消息对象中的大于状态机中的才能进行后续操作。如果小于状态机中的,到这里就处理完毕,不用返回消息给发送方。

这里继续看大于的情况,如果大于会将消息对象的proposal 赋值给状态机对象的acceptor.promise 字段。代码如下所示:

pax_msg *handle_simple_prepare(pax_machine *p, pax_msg *pm, synode_no synode) {
  pax_msg *reply = NULL;
  if (finished(p)) {
    MAY_DBG(FN; SYCEXP(synode); BALCEXP(pm->proposal); NDBG(finished(p), d));
    reply = create_learn_msg_for_ignorant_node(p, pm, synode);
  } else {
    int greater =
        gt_ballot(pm->proposal,
                  p->acceptor.promise); 
    MAY_DBG(FN; SYCEXP(synode); BALCEXP(pm->proposal); NDBG(greater, d));
    if (greater || noop_match(p, pm)) {
      if (greater) {
        p->acceptor.promise = pm->proposal; 
      }
      reply = create_ack_prepare_msg(p, pm, synode);
    }
  }
  return reply;
}

然后创建回复的消息,分以下两种情况:

  • 之前接受过其他节点的值。这里通过状态机中保存的accpetor 是否有值进行判断。如果接受过,那么根据Paxos 协议的要求需要把接受的值和对应的proposal number 返回给对应的节点。并且设置Paxos 消息对象的阶段类型为ack_prepare_op,表示回复prepare 并且带有已经接受过的值。

  • 之前没有接受其他节点的值。这里就比较简单了,直接设置Paxos 消息对象的阶段类型为ack_prepare_empty_op,表示回复prepare 消息并且没有接受过其他节点发来的值。

上述流程在本地节点的执行流程也是一致的,不过本地节点不需要回复消息,因为都在本地,但是会创建回复消息对象。在调用回复逻辑的时候,会判断是否是本地节点,如果是本地节点就会直接调用对应的逻辑来处理回复的消息对象,这里的处理逻辑跟处理其他节点的回复的消息对象一致。下面就来看看详细的处理逻辑,对应图9-11 右下角的第2 步,当A 节点收到B、C 节点发来的消息时,同样A 节点也会通过tcp_server 任务读取数据,然后创建acceptor_learn 任务,在acceptor_learn 任务中根据Paxos 消息阶段类型进行处理:

1)检查Paxos 消息是否已经完成,这里会检查状态机对象中learner 字段中的消息是否为空,如果不为空的话则表示该状态机接受了learn 阶段的消息,也就表示该Paxos 完成了所有阶段的处理。

2)设置状态机中proposer.prep_nodeset 的bitmap,在后续判断多数派投票的时候会用到。

3)如果收到的Paxos 消息阶段类型为ack_prepare_op,这时候会判断发送来的Paxos 消息的proposal number 是否大于本地状态机的对应的proposal number,如果大于则将对应的值赋值给本地状态机。

4)如果收到的Paxos 消息阶段类型为ack_prepare_empty_op,这时候会判断发送来的Paxos 消息中回复的proposal number 是否大于状态机中的proposal number,如果大于则检查是否达到多数派,也就是有超过一半的节点回复该请求,这一步通过检查刚刚提到的proposer.prep_nodeset 的bitmap 来实现。如果达到了多数派则可以进行下一个阶段的请求。

相关代码如下:

bool_t handle_simple_ack_prepare(site_def const *site, pax_machine *p,
                                 pax_msg *m) {
  if (get_nodeno(site) != VOID_NODE_NO)
    BIT_SET(m->from, p->proposer.prep_nodeset);
  bool_t can_propose = FALSE;
  if (m->op == ack_prepare_op &&
      gt_ballot(m->proposal, p->proposer.msg->proposal)) { 
    replace_pax_msg(&p->proposer.msg, m);

第9章 MySQL高可用实现

9.2 MySQL组复制与总结

9.2.1 组复制架构与数据流(续)

(2)proposer 阶段

本阶段对应图9-11 左下角到右下角的第3 步,在准备阶段收到了多数派的回复之后,可以开始进行proposer 阶段,该阶段同样由A 节点发起,发送给A、B、C 三个节点。这里发送的Paxos 消息为状态机中的proposer 字段存储的Paxos 消息,这个消息值可能是自己的,也可能是其他节点的,在准备阶段如果遇到其他节点已经接受过值了,这个时候在proposer 阶段发送的值就是该节点的。此时paxos 消息的阶段类型为accept_op。

然后A、B、C 节点的acceptor_learn 任务开始处理proposer 消息,同样还是会看当前节点该消息的状态机中是否已经到达learn 阶段,到learn 阶段表示该消息已经完成了。这里是通过状态机中的learn 阶段是否有消息来判断的。

如果没有到达learn 阶段,则对比状态机中接受者中的proposal number 和发送来的Paxos 消息的proposal number,如果状态机中的小于等于发送来的则继续进行下一步,否则在这里就终止处理了,也不会发送回复消息。

继续进行下一步就是接受发来的proposer 请求的值,也就是将Paxos 消息对象赋值给状态机的acceptor.msg 字段,然后创建回复的消息,此时消息阶段类型为ack_accept_op,然后向A 节点发送回复。

具体代码如下:

pax_msg *handle_simple_accept(pax_machine *p, pax_msg *m, synode_no synode) {
  pax_msg *reply = NULL;
  if (finished(p)) { 
    reply = create_learn_msg_for_ignorant_node(p, m, synode);
  } else if (!gt_ballot(p->acceptor.promise,
                        m->proposal) || 
             noop_match(p, m)) {
    MAY_DBG(FN; SYCEXP(m->synode); STRLIT("accept "); BALCEXP(m->proposal));
    replace_pax_msg(&p->acceptor.msg, m);
    reply = create_ack_accept_msg(m, synode);
  }
  return reply;
}

本阶段对应图9-11 右下角到左下角的第4 步,A 节点收到消息后,同样由acceptor_learn 任务进行处理,处理消息阶段类型为ack_accept_op,详细步骤如下:

  1. 比较状态机中proposer 中的proposal number 和发送来的paxos 消息中的reply_to 中的proposal number 的大小,如果相等则进行下一步,否则则终止处理。
  2. 进入下一步中就开始设置状态机中的proposer.prop_nodeset 的bitmap。
  3. 对比发送来的消息中的proposal number 和状态机中的proposer.sent_learn 的proposal number,如果发送来的消息中的大则进行下一步,否则终止处理。
  4. 进入到下一步就检查proposer 的请求是否有达到多数派的回复,检查的机制跟准备阶段一致,如果达到多数派则进行下一步。
  5. 到这里就开始创建learn 阶段的消息了,这里也分两种情况,看no_duplicate_payload 是否为1,默认情况下为1,创建tiny_learn_op 阶段类型的消息,如果不是则创建learn_op 阶段类型的消息。
  6. 将状态机中的proposer 的proposal number 赋值给状态机中的proposer.sent_learn。
  7. 在创建完learn 阶段的消息后就进入learn 阶段了。

具体代码如下:

static void handle_ack_accept(site_def const *site, pax_machine *p,
                              pax_msg *m) {
  ADD_EVENTS(add_synode_event(p->synode); add_event(string_arg("m->from"));
             add_event(int_arg(m->from));
             add_event(string_arg(pax_op_to_str(m->op))););
  MAY_DBG(FN; SYCEXP(m->synode); BALCEXP(p->proposer.bal);
          BALCEXP(p->proposer.sent_learn); BALCEXP(m->proposal);
          BALCEXP(m->reply_to););
  MAY_DBG(FN; SYCEXP(p->synode);
          if (p->acceptor.msg) BALCEXP(p->acceptor.msg->proposal);
          BALCEXP(p->proposer.bal); BALCEXP(m->reply_to););
  pax_msg *learn_msg = handle_simple_ack_accept(site, p, m);
  if (learn_msg != NULL) {
    if (learn_msg->op == tiny_learn_op) {
      send_tiny_learn_msg(site, learn_msg);
    } else {
      assert(learn_msg->op == learn_op);
      send_learn_msg(site, learn_msg);
    }
  }
}
/* Handle answer to accept */
pax_msg *handle_simple_ack_accept(site_def const *site, pax_machine *p,
                                  pax_msg *m) {
  pax_msg *learn_msg = NULL;
  if (get_nodeno(site) != VOID_NODE_NO && m->from != VOID_NODE_NO &&
      eq_ballot(p->proposer.bal, m->reply_to)) { /* answer to my accept */
    BIT_SET(m->from, p->proposer.prop_nodeset);
    if (gt_ballot(m->proposal, p->proposer.sent_learn)) {
      learn_msg = check_learn(site, p);
    }
  }
  return learn_msg;
}

tiny_learn_op 和learn_op 的区别

  • tiny_learn_op:默认情况下协议都用该类型,该类型在处理的时候需要判断是否需要发送read_op 阶段类型,主要是针对有一些节点可能由于负载较高或者网络延迟,其消息状态比较落后,通过读取的方式来修复对应消息的状态。
  • learn_op:在跳过消息和处理recover_learn_op 等情况下使用,这些情况是非正常协议同步的情况下,不需要通过读取的方式来修复节点消息状态。

(3)learn 阶段

本阶段对应图9-11 左下角到右下角的第5 步,在proposer 阶段收到多数派的回复之后,可以开始进行learn 阶段。其实到了learn 阶段就表示消息肯定能执行成功,可以理解跟MySQL 中这个事务已经提交了一样,无论任何异常情况,最终都能修复这个消息让它完成learn 阶段。learn 阶段跟其他阶段不太一样的是,它无须等待其他阶段的回复,类似一个广播消息告诉其他节点这个Paxos 消息已经达到learn 阶段了。下面看下具体流程。

先是创建并发送learn 消息,learn 消息又分为tiny_learn_op 和learn_op 阶段类型:

  • 创建tiny_learn_op 阶段类型的消息:首先设置消息的阶段类型为tiny_learn_op,然后将状态机中proposal.bal 赋值给消息对象的reply_to,这就是proposal number 的赋值。
  • 创建learn_op 阶段类型的消息:首先设置消息的阶段类型为learn_op,将消息对象中的proposal 赋值给reply_to,这就是proposal number 的赋值。

创建好learn 消息后,由A 节点发送给A、B、C 节点,然后A、B、C 节点的acceptor_learn 任务开始处理learn 消息,这里也会区分learn_op 和tiny_learn_op 阶段类型消息的处理:

  • 处理learn_op 阶段类型消息:同样也是会看当前节点该消息的状态机中是否已经到达learn 阶段,到learn 阶段表示该消息已经完成了。这里是通过状态机中的learn 阶段是否有消息来判断的。如果没有到达learn 阶段,则开始处理learn 消息,其实主要就是将消息对象赋值给状态机中的learner.msg 字段。
  • 处理tiny_learn_op 阶段类型消息:首先会看状态机中的acceptor.msg 是否为空,如果为空的话则需要发送read_op 阶段类型的消息,如果不为空则比较状态机中acceptor.msgproposal 和消息对象中proposal 的proposal number 是否相等,如果相等则跟上述逻辑一样,主要就是将消息对象赋值给状态机中的learner.msg 字段。如果不相等则还是发送read_op 阶段类型的消息。

这里发送回复read_op 阶段类型的消息的原因是,该节点已经收到该消息并且该消息处于learn 阶段了,但是该节点本地的状态机对象中该消息没有完成proposer 阶段,这个时候需要再跟其他节点确认该消息处于什么阶段,此时需要发送read_op 阶段类型的消息,当其他节点收到该阶段类型的消息时会判断消息是否达到learn 阶段,如果到达learn 阶段就发送回复。

(4)Paxos 消息应用

Paxos 消息的应用对应领导者和跟随者的GCS 层和MySQL 层,上述Paxos 协议的三个阶段都介绍完毕了,在协议层数据都同步完成了,那消息又是怎么执行的呢?

在XCOM 层有个executor_task,它的作用就是定时扫描消息然后进行执行,不过执行的条件是需要消息达到learn 阶段。刚刚上述消息已经达到learn 阶段了,这个时候executor_task 就会扫描到它然后开始进行执行。不过在executor_task 中不做具体的执行逻辑,它只是会封装对应的对象然后带上Paxos 消息,最终将该对象发送到GCS Proxy 层的m_notification_queue 队列中。

然后由process_notification_thread 线程来消费m_notification_queue 队列进行处理,处理完成后再发往GCS 层的incoming 队列,最终由applier_thread_handle 应用线程进行消息的应用。这里应用也分为两种情况:

  • 领导者节点(请求发送方)应用:在领导者节点,消息的应用其实很简单,它不需要真实地去应用具体的数据,因为领导者节点是数据写入方,在MySQL 层它的数据已经写入正在等待提交,领导者节点的应用其实就是给MySQL 层解锁,让其能进行后续的提交动作。
  • 跟随者节点(请求接收方)应用:在跟随者节点,应用就相对复杂一些了,因为它需要将数据应用到MySQL 层,其实也就是写入中继日志中,下面具体来看看。

在MGR 中,写入中继日志主要分为以下三个阶段:

  1. event_cataloger:处理Transaction_context_log_event,标记事务开始。
  2. certification_handler:处理GTID_EVENT,将事务进行认证,主要对比是否有冲突,没有冲突则完成认证(只有在GTID_EVENT 的时候需要进行认证),在后面有单独的小节详细介绍冲突检测。
  3. applier_handler:处理事务中其他的事件,调用我们熟悉的queue_event 方法将每个事件写入中继日志中,在正常的主从复制中,I/O 线程也是调用的该方法。

可以看到,MGR 最终只是保证写入中继日志中,并不是要等待从库的事务提交,这一点跟MySQL 的半同步复制一样。所以大家需要理解一点,MGR 能保证数据强一致,但是不能保证实时一致,因为SQL 线程在应用数据的时候还是可能会产生延迟。

至此,MySQL 层的一条数据更新就执行完成了。可能有的读者会想到,Paxos 增加领导者和各个节点的交互,肯定会影响性能。这是必然的,在保证强一致的情况下势必会影响性能,不过MGR 也做了一些优化,比如正常的情况下不需要三阶段只需要两阶段,这样就减少了一次交互。并且在MGR 内部还有batch 的优化机制,这里由于篇幅限制就不再详细介绍。有相关的测试表明MGR 的性能在高并发情况下会优于半同步复制,其原因是MGR 能做batch 处理,而半同步复制的ACK 则是只能每个事务对应一个,不能做batch 处理。

9.2.3 MGR Paxos 协议优化

前面介绍的MGR 都是单主模式下的,这个时候进行两阶段没有问题,因为没有争抢。那如果是多主模式下它也能进行两阶段,这是怎么做到的呢?其实这里采用了Mencius 类似的思想,保证同一轮中只有一个提议者能发起请求。

MGR 消息流转图如图9-15 所示。

(图9-15 MGR 消息流转图)
节点1 节点2
(msgno: 100, nodeno: 0)       (msgno: 100, nodeno: 1)
(msgno: 101, nodeno: 0)       (msgno: 101, nodeno: 1)
(msgno: 102, nodeno: 0)       (msgno: 102, nodeno: 1)
...                            ...

由于篇幅原因,这里只画出了两个节点,实际上是三个节点。可以看出,每个节点维护的是相同的msgno 序列,但是不同节点对应了不同的nodeno,例如针对msgno 100,第一个节点就对应(100,0),第二个节点对应(100,1),第三个节点对应(100,2)。这样虽然三个节点的msgno 是一致的,但是通过nodeno 区分,这样可以保证每个节点的维护的槽是独立的。

那MGR 是如何运用msgno 和nodeno 来区分独立的槽的?下面简单介绍一下相关流程。

首先我们需要了解下MGR 中一个比较重要的结构体:

struct synode_no {
  uint32_t group_id;
  // 多个节点msgno 相同
  uint64_t msgno;
  // node number 不一样
  node_no node;
};

这个结构体代表一个独立的槽编号,在MGR 中提议者发起请求的时候,会获取当前的msgno,然后封装成synode_no 对象,最终存储在Paxos 消息对象中,对应的就是之前介绍的Paxos 消息对象中的synode 字段。同样在提议者发起请求的时候会初始化状态机,然后会将状态机存储在状态机缓存中,状态机缓存底层实际是一个全局的哈希表,哈希表的key 是利用synode_no 值生成的,生成方式如下:

static unsigned int synode_hash(synode_no synode) {
  /* 需要分别对三个字段进行哈希处理,因为结构体可能包含具有未定义值的填充. */
  return (unsigned int)(4711 * synode.node + 5 * synode.group_id +
                        synode.msgno) %
         BUCKETS;
}

同样,在其他节点收到Paxos 消息的时候,也会创建状态机对象并插入全局哈希表中,对应的key 采用Paxos 消息中的synode 字段生成。

在进行后面的交互的时候,例如返回ack_accept_op 消息,发送learn_op 消息等,Paxos 消息对象中的synode 是一样的,对应的状态机对象也是一样的,状态机对象只用在第一次创建,后续直接从全局的状态机缓存中获取即可。这样其实就保证了Paxos 协议所有阶段的消息唯一对应每个节点的状态机对象,进而保证每个节点发送proposer 请求的时候,它在整个集群都是唯一的。最终就把每个槽号区分开来,保证各个节点发送的proposer 请求不会发生冲突。

下面我们举个简单的例子:节点A 发起proposer 请求,槽号为(100,0),节点A 创建状态机对象并且插入全局状态机缓存中,节点B 和节点C 收到消息后创建对应的状态机对象并插入其全局状态机缓存中。

节点B 发起proposer 请求,槽号为(100,1),节点B 创建状态机对象并且插入全局状态机缓存中,节点A、节点C 收到消息后创建对应的状态机对象并插入其全局状态机缓存中。

节点C 发起proposer 请求,槽号为(100,2),节点C 创建状态机对象并且插入全局状态机缓存中,节点A、节点B 收到消息后创建对应的状态机对象并插入其全局状态机缓存中。

这样的话在每个节点的全局状态机缓存中存在槽号为(100,0)、(100,1)、(100,2) 对应的状态机对象,然后这三个槽号对应的后续流程都围绕其状态机缓存操作,互不影响。

不过,虽然它们的Paxos 协议的各个节点不影响,但是执行的时候是有影响的,因为需要保证全局的顺序性。在MGR 中是预先定义好了顺序,其实就是按照nodeno 排序,例如上述三个槽对应的执行顺序如下:

(100,0)
(100,1)
(100,2)

那MGR 是如何保证它们的执行顺序的呢?这就跟executor_task 任务的机制相关了。在executor_task 任务中,执行同一个msgno 的时候,消息按照nodeno 的顺序执行,具体代码逻辑如下:

// 默认情况下node number 自增,如果node number 大于等于了集群节点数量,那么就增加msgno.
synode_no incr_synode(synode_no synode) {
  synode_no ret = synode;
  ret.node++;
  if (ret.node >= get_maxnodes(find_site_def(synode))) {
    ret.node = 0;
    ret.msgno++;
  }
  
  return ret; 
}

在上述代码逻辑上层一直循环执行消息,执行完一条消息就调用上述逻辑执行下一条。不过这里有一个问题,只有到达不过这里有一个问题,只有到达learn 阶段的消息才能进行执行,例如槽号为(100,0) 已经达到learn 阶段了,这个时候就可以执行了。

那如果槽号(100,0) 一直没有到达learn 阶段,或者说它对应的A 节点根本没有请求,也没有发出proposer 请求,这个时候后续的槽号怎么处理呢?

这种情况需要处理间隙,因为是顺序执行的,中间如果有的槽号就是没有触发到达learn 阶段,这个时候就需要有机制帮助它完成learn 阶段,Mencius 同样也有这个问题。下面看看MGR 是如何处理的。

在MGR 中针对上述情况有被动处理和主动处理两种方式。

  • 被动处理:节点A 维护(100,0),此时节点没有任何请求,节点B 维护的槽号(100,1),此时已经达到learn 阶段,当节点A 收到节点B 发来的learn_op 消息的时候会触发激活sweeper_task,sweeper_task 会扫描当前的synode,这个时候会发现槽号(100,1) 已经完成learn 阶段,而槽号(100,0) 是未使用的状态,此时会主动跳过该槽号,跳过的流程就是直接发送一个消息阶段类型为skip_op 的Paxos 消息对象,其他节点收到后直接进行处理跳过,这样executor_task 就能执行后续的消息了。

  • 主动处理:节点B 维护(100,1),发起proposer 请求,槽号为(100,1),此时executor_task 运行时发现正在执行的消息(100,0)。这个时候会检查该槽号,主要看该槽号对应的状态机是否处于空闲状态,如果是则需要根据槽号创建阶段类型为read_op 的Paxos 消息对象,最终发送给其他节点,然后等待其他节点回复,其他节点只有在对应的消息到达learn 阶段后才会发送回复。如果节点B 没有收到回复,则会重试几次,如果一直没有收到回复,就会针对该槽号(100,0) 发起三阶段,也就是有准备阶段,目的是跳过该槽号。

默认情况下,被动处理就能处理间隙的槽号,如果对应的槽号的节点负载过高或者因为网络不好等情况无法及时触发被动处理逻辑,这个时候就需要集群的某个节点主动触发,发起三阶段来跳过该消息,只要达到多数派就能跳过。这个时候,即使维护该槽号的节点无法通信,集群也能跳过该槽号继续进行后续的操作。

按照这种逻辑,我们可以想到,在MGR 开启多主模式下,如果各个节点的请求不是很均匀,那么会经常发生槽号跳过,如果有大量的槽号跳过势必会影响集群整体的性能,并且如果集群某个节点负载过高或者遇到网络通信不好等情况,则会加剧该影响。所以在我们日常的使用中,开启多主模式前一定要详细评估,在使用的过程中也要定期进行监控。

9.2.4 MGR 冲突检测

冲突检测是在MySQL 层检测数据是否存在冲突,前面已经多次提到,那什么情况会进行冲突检测呢?

  • 在主从切换之后会进行冲突检测。主从切换之后新的领导者之前是跟随者,可能存在没有应用完成的数据,这种情况领导者在写入的时候需要进行冲突检测。这个阶段完成之后,后续的写入不需要进行冲突检测。
  • 在多主模式下启动的时候会让所有节点都开启冲突检测,后续每条写入都需要进行冲突检测。

其实冲突检测主要是为了防止业务方在双写的时候发生冲突,如果业务方能控制写入顺序或者区分单元,那么其实MySQL 基本上不会发生冲突。

了解了冲突检测的场景后,下面来介绍MGR 中冲突检测是如何实现的。MGR 冲突检测的流程如图9-16 所示。

(图9-16 MGR 冲突检测的流程)
节点1(领导者)                    节点2(领导者)
   transaction context event           transaction context event
   GTID_EVENT                          GTID_EVENT
   QUERY_EVENT                         QUERY_EVENT
   TABLE_MAP_EVENT                     TABLE_MAP_EVENT
   ROWS_EVENT                          ROWS_EVENT
   XID_EVENT                           XID_EVENT
   binlog 缓存                         binlog 缓存
   snapshot_version                    snapshot_version
   broadcast_gtid_executed             broadcast_gtid_executed
   process_notification_thread         process_notification_thread
   WriteSet                            WriteSet
   事务消息                            事务消息
   WriteSet: GTID                      WriteSet: GTID
   WriteSet: GTID                      WriteSet: GTID
   WriteSet: GTID                      WriteSet: GTID
   WriteSet: GTID                      WriteSet: GTID
   Certification_info (MAP)            Certification_info (MAP)
   清理执行过的GTID                    清理执行过的GTID
   发送执行过的GTID                    发送执行过的GTID
   更新或者插入                        更新或者插入
   获取WriteSet                       获取WriteSet
   进行比较                            进行比较

由于篇幅原因,这里只画出了两个节点,实际上正常是三个节点。可以看到上面两个节点各自维护自己的binlog 缓存和WriteSet,在事务提交时通过broadcast_gtid_executed 交换已执行的GTID 信息,并通过Certification_info (MAP) 进行冲突检测,最终通过比较WriteSet 来决定事务是否冲突。

(后续内容在第9章后半部分,包括具体的冲突检测算法等,但原文在此结束,本部分内容完整。)


至此,第9章关于MySQL组复制与总结的内容已完整转换。

9.2 第9章 MySQL组复制与总结

9.2.4 冲突检测流程

背景说明

由于篇幅原因,图中只画出了两个节点,实际上正常是三个节点。两个节点均为领导者(多主模式),两个节点都能写数据。图9-16中假设请求从左侧的节点发出,生成对应的事务数据发送到右侧的节点,右侧节点收到事务数据后进行冲突检测。同样右侧节点也可以发送请求,此处同样由于篇幅原因,只画出了左侧请求的流程。

图9-16 MGR冲突检测的流程(文本描述替代,附Mermaid图)

flowchart TD
    subgraph 节点1(领导者)
        A1[事务提交] --> B1[生成WriteSet]
        B1 --> C1[生成snapshot_version]
        C1 --> D1[存储到transaction context event]
        D1 --> E1[通过Paxos发送事务消息]
    end
    subgraph 节点2(领导者)
        A2[接收到事务消息] --> B2[应用transaction context event]
        B2 --> C2[获取WriteSet和snapshot_version]
        C2 --> D2[应用GTID_EVENT时触发冲突检测]
        D2 --> E2[循环遍历WriteSet]
        E2 --> F2{在Certification_info中查找对应记录?}
        F2 -->|存在| G2{GTID是snapshot_version子集?}
        F2 -->|不存在| H2[不冲突,继续]
        G2 -->|是| H2
        G2 -->|否| I2[冲突,返回报错]
        H2 --> J2[生成GTID(从预分配范围获取)]
        J2 --> K2[将GTID加入snapshot_version]
        K2 --> L2[将WriteSet插入Certification_info]
        L2 --> M2[计算last_committed和sequence_number]
    end
    subgraph 节点2额外线程
        N2[定期广播已执行GTID]
        O2[清理Certification_info中已完成的GTID]
    end

冲突检测的核心思想

针对事务中的每条记录,存储其WriteSet记录以及对应的GTID信息,然后在冲突检测时检查本地是否存在相同的WriteSet记录,并比较其GTID信息。如果请求方的GTID大于本地的,说明可以执行;如果请求方的GTID小于本地的,则说明在此之前有其他请求更新了该记录,此时发生冲突。

这里的WriteSet就是之前在介绍MySQL 8.0并行复制时提到的概念。

具体冲突检测流程

步骤1:生成WriteSet和snapshot_version

在领导者侧(发送方)生成WriteSet信息,生成规则与MySQL 8.0并行复制一致。然后生成snapshot_version,snapshot_version实际上就是MySQL Server已经执行的最大GTID。这两个信息会存储在transaction context event中。

步骤2:发送事务消息

通过Paxos协议将事务消息发送到其他节点上,其他节点收到事务消息后开始应用。首先应用transaction context event,获取发送过来的WriteSet和snapshot_version信息。然后在应用GTID_EVENT时调用冲突检测的逻辑。

步骤3:冲突检测

进入冲突检测逻辑后,首先判断是否需要冲突检测(需要进行冲突检测的情况已在前面介绍)。循环遍历事务带过来的每个WriteSet元素,在Certification_info中查找是否有对应的记录:

  • 如果有,获取对应的GTID信息。
  • 判断该GTID是否属于snapshot_version的子集。
    • 如果是:表示领导者端的GTID更新一些,不冲突。
    • 如果不是:表示跟随者的GTID更新一些,说明有节点在此之前更新了这条记录,发生冲突。

按照上述逻辑检测WriteSet中的每条记录,如果都不冲突则继续后续操作;如果冲突则直接结束并返回报错。

步骤4:生成GTID

如果不冲突,从预先分配好的范围中获取GTID信息。然后将获取到的GTID加入snapshot_version中。

GTID的gno申请时机

  • 在MGR中,gno实际上是在冲突检测完成之后才申请的。在领导者端也是在事务进入提交阶段、组提交的第一个阶段写binlog时申请的。
  • 在MGR中,事先为每个节点分配好一个GTID范围,基于已执行的GTID计算得出。
  • 事务执行时,领导者节点和跟随者节点分别申请gno,MGR可以保证同一事务申请到的gno相同。

步骤5:插入Certification_info

将事务的所有WriteSet插入本地节点的Certification_info中,以便后续再次进行冲突检测时使用。

步骤6:计算last_committed和sequence_number

计算last_committed和sequence_number信息(并行复制最重要的两个字段)。生成规则与并行复制思想一致,主要看是否有操作相同的数据,这里很好地利用了Certification_info。如果发现了相同的数据,那么后续的事务就不能并行应用了,此时重新计算last_committed和sequence_number。

至此,事务的冲突检测完成。实现非常巧妙——借用了并行复制WriteSet的思想,又复用该思想使其后续在SQL线程能并行执行。

Certification_info的清理机制

细心的读者可能会想到的问题

Certification_info在高并发场景下会变得非常大,如果不及时清理会造成内存占用率非常高。MGR设计了一套清理机制。

  • 每60秒MGR主动进行清理。
  • 清理依据:清理在所有节点端完成执行的GTID对应的数据。
  • MGR在每个节点开启一个线程,定期广播自己已经执行完成的GTID信息。
  • 当节点收到这个信息后,与自己已执行的GTID信息求交集,最终得到可以清除的GTID信息。
  • 然后扫描Certification_info中对应的GTID,将这些数据清理出去。

9.2.5 MGR流控

上述介绍了MGR的数据同步流程。可以看到,在领导者端如果并发较高,跟随者端可能产生延迟。在一些极端情况下(例如网络问题、跟随者硬件问题等),会造成跟随者的数据过于落后。如果此时发生故障迁移,延迟的节点提升为领导者,就会影响业务。

流控机制的核心思想:以集群中处理最慢的节点为标准进行限速,保证所有节点都能跟上,不会造成过大的延迟。

限速位置与实现

流控在回调函数 group_replication_trans_before_commit 中发送事务消息之前调用。具体逻辑如下:

int32 Flow_control_module::do_wait() {
  DBUG_TRACE;
  int64 quota_size = m_quota_size.load();
  int64 quota_used = ++m_quota_used;
  if (quota_used > quota_size && quota_size != 0) {
    struct timespec delay;
    set_timespec(&delay, 1);
    mysql_mutex_lock(&m_flow_control_lock);
    mysql_cond_timedwait(&m_flow_control_cond, &m_flow_control_lock, &delay);
    mysql_mutex_unlock(&m_flow_control_lock);
  }
  return 0;
}

逻辑说明:对比目前已经处理的事务数量和能够处理事务数量的额度,一旦超额度则进行限流。限流动作很简单:等待1秒,1秒过后再进行后续事务发送。

关键指标:quota_used 和 quota_size

  • quota_used:每次发送事务消息时 m_quota_used 累加1,然后赋值给 quota_usedm_quota_used 在每次重新计算 quota_size 时重置。

  • quota_size:由计算比较多个节点得到的最终值。

    • MGR中有一个认证线程,内部有定时进行流控的动作。
    • 认证线程给其他节点发送:自己当前等待认证的事务数量、等待应用的事务数量、已经认证的事务数量等信息。
    • 其他节点收到后记录到自己的内存中,并与上一次的数据对比,得到这些数据的差集,从而获得每个节点的处理能力。
    • 然后比较所有节点的数据,取最小的值作为 quota_size,但不能低于设置的最小阈值。

quota_size每秒计算一次,计算时重置 m_quota_used 的值。因此每秒判断时 quota_size 可能不同(变大或变小),从而影响领导者的并发。


9.3 总结

本章主要探讨了MySQL的高可用实现方式,包括主从复制组复制

主从复制

  • MySQL实现高可用的重要方式,经历了多个发展阶段。
  • 包括全量同步和增量同步,增量同步基于binlog日志复制,通过GTID或position方式确定同步位点。
  • 半同步复制:在主从复制基础上,确保主库事务提交前至少有一个从库接收并写入中继日志,提高了数据安全性。
  • 并行复制:从MySQL 5.6到8.0不断改进,通过不同策略提高从库应用事务的速度,解决主从复制延迟增大的问题。

组复制

  • MySQL 5.7版本引入,以MySQL插件方式实现,底层基于类Paxos强一致协议。
  • 架构复杂,包括插件管理、GCS、GCS XCOM Proxy、XCOM四层,通过类Paxos协议实现数据流的强一致同步。
  • 多主模式下采用Mencius思想优化,保证同一轮中只有一个提议者能发起请求,并通过冲突检测流控机制确保集群性能和数据一致性。
  • 解决了MySQL高可用和数据一致性问题,但在多主模式下需注意性能影响。

总体评价

这些技术的发展和应用,为MySQL提供了更可靠、高效的高可用解决方案,满足了不同业务场景的需求。