设计要求
基于 Raft 协议(包含 snapshot 机制),实现一个可以容错的分区 Key/Value 数据库。有以下几个具体的要求:
- 每个 Replica Group 负责存储一部分 K/V 数据,通过 Raft 实现一致性。
- Shard Master 负责管理配置信息,决定如何分片,通过 Raft 实现一致性。
- 客户端通过 Shard Master 得知所有的分片信息,Replica Group 定期拉取最新的配置。
- 支持分片在不同的 Replica Group 中移动,应对 Replica Group 下线或者重新上线的情况。
- 在变更配置的过程中,如果一个 key 不需要迁移,可以继续提供对外服务。
- 变更配置后,原有的节点不需要保存过期的数据。
整体架构
整体结构是一个典型的 Master/Worker 结构。实现的功能比较基础,有些功能没有实现,比如,Raft Group 的结点不能变更,Client 不允许并发操作。

Shard Master 实现了以下的功能:
- 存储配置信息,配置信息包含了 Replica Group 服务器的地址,以及每个 Replica Group 具体负责处理哪些 Shard。
- 分配 Shard 给不同的 Replica Group 处理,每个 Replica Group 分配的 Shard 数量要尽可能平均。
- 响应
Join/Leave/Move/QueryRPC 请求,并且处理配置的变更。
Replica Group 实现了以下的功能:
- 存储对应的 Shard,响应客户端对于 Shard 的
Get/Put/Append请求。 - 周期性向 Shard Master 询问配置信息是否变更,如果有变更及时完成数据的迁移。
- 在数据分片迁移的过程中,不能响应客户端对于当前数据分片的请求,直到分片迁移完成。
Shard Master 实现
RPC 请求(Join/Leave/Move/Query)的实现
主要分为客户端和服务端两个方面。
客户端:
- 每个客户端分配一个指定的序号,同时每个请求分配一个递增的请求序号,保证所有的操作只执行一次。
- 每个客户端同时只能有一个请求等待响应,不能同时发送多个请求。
服务端:
- 服务端需要一个单独的线程执
apply操作,接收所有通过 Raft 达成共识的消息。 - 在接收到消息的时候,需要执行去重判断(根据客户端的请求序号),保存一个
map存储每个客户端最大的请求序号,只有请求序号超过当前客户端最大的请求序号的请求才能执行。 - 客户端需要等到请求达成共识后才能返回。
服务端如何才能判断客户端的请求达成共识?
我们可以根据 Raft 协议的安全性实现。Raft 需要满足 State Machine Safety,定义如下:
State Machine Safety: if a server has applied a log entry at a given index to its state machine, no other server will ever apply a different log entry for the same index.
这个特性说明如果在某个 index 上的消息达成了共识,那么这个 index 上的消息就不会改变。
因此,Client 在写入 Command 并获得写入的 index 后,创建对应 index 的 channel 并监听,如果 channel 返回的命令和发送的命令相等,表示命令执行成功。
Reconfigure
当 Shard Master 接收到 Join/Leave 消息时,需要对于分片数据进行重新划分。有以下几个划分思路:
- 一致性哈希。把整个数据分片作为一个哈希环,每一个 Replica Group 计算一个哈希值并映射到哈希环上,按照一个顺序(顺时针 / 逆时针)把哈希环上的每个分区的数据放到对应的下一个 Replica Group 中。这个做法不能保证 Shard 的平均分配。
- 范围分片。为每个 Relica Group 划分一个范围,负责这个范围内所有数据的读取写入操作。这个做法可以保证分区需要处理的数量是基本平衡的。
当有 Replica Group 加入或者退出,该怎么处理?
最简单的方法是直接重新分片。计算每个分片需要处理多少个分区数据,然后依次遍历每个 Replica Group 并进行分配。目前采用这个版本实现分片重新配置的算法。这个方法的问题在于可能会导致大量的数据在网络上传输,因为这样做会导致原本不需要迁移的数据因为换了一个 Replica Group 而需要迁移,造成了网络带宽的浪费。
另一种方法的出发点是只传输必要的数据量。首先计算出每个 Replica Group 需要存储的最小数据分片数量(因为这是需要传输数据量的最小值)。如果是加入操作,那么挑选当前数据分片最多的 Replica Group,每次迁出一个 Shard,直到传输数量和计算的数量相等。迁出操作和加入操作相反,挑选当前数据分片最少的 Replica Group,每次迁入一个 Shard。
Replica Group 实现
KV 数据的存储方式
由于设计需要支持迁移,并且迁移完成后删除不需要的数据,把服务器上所有的数据保存在同一个 map 中不是一个合适的行为,这样在遍历过程中需要访问所有数据,效率不高。因此,数据存储的方式应该是分别存放每一个 Shard 的数据。Shard 整体的结构见下。
1 | type Shard struct { |
DB 保存了具体的数据,LastSerial 存储了每个 Client 对应的最大 RPC 请求编号。用于保证一致性的实现。
感知配置的变化
Add code to periodically fetch the latest configuration from the shardmaster, and add code to reject client requests if the receiving group isn’t responsible for the client’s key’s shard.
因此,为了感知配置的变化,Replica Group 的 leader 需要定期向 Shard Master 询问当前的配置是否过期,如果过期了,leader 需要给所有的 follower 发送一条消息,告知配置过期,需要更新配置。
更新配置的步骤如下:
- 如果一个分区当前属于这个节点,但是下一个配置需要转移出去,那么这个分区就暂时不能提供服务,并把它缓存到一个队列中。
- 所有过期数据全部收集完毕后,开始分区转移流程。
分区转移
假设分区
S的旧 Replica Group 为G1,新 Replica Group 为G2。
实现分区的迁移有以下两种思路:
G2向G1拉取分区S。G1向G2推送分区S。
这两种方法都是可行的,考虑到需要实现删除过期数据的要求,我的实现采用了第二种方法,也就是推送的方法,这种方法在对方接收到发送的 Shard 之后就可以删除不需要的 Shard,而第一种方法则需要多发送一次 RPC。
分区转移的发送方流程如下:
- 根据配置信息,获取每一个在缓存队列的 Shard 的目的地址。
- 把 Shard 封装为分区转移请求并发送过去,等待目的 Replica Group 把发送的 Shard 接收。
- 接收完成后,删除对应的缓存 Shard。
为了实现上述的过程,用于迁移的缓存数据结构设计如下:
1 | type Migrate struct { |
其中,Config 字段表示这个数据属于哪个 configuration。
分区转移的接收方需要判断接收到的 Shard 是否需要自己处理。如果有的 Replica Group 感知配置变更的速度较慢,或者网络乱序,接收方都有可能收到过期的分区转移请求。接收方接收到一个分区后,执行以下的处理操作:
- 无论如何,接收到的 Shard 在某个配置中自己负责的,因此需要通过 Raft 告知 Replica Group 中的 Follower 接收到了新的分区数据,并且通知发送方删除。
- 如果当前分区是自己在当前配置需要处理的,那么直接保存,并且让该分区可以对外提供服务。
- 如果当前分区不是自己在当前配置需要处理的,缓存到缓存队列。,找到它在接下来的配置中的目的地址,构造分区转移请求,执行分区转移的发送方法。
这个方法要求每个 Replica Group 都需要知道所有的配置变更信息。因此,感知配置变更的时候,每个 Replica Group 查询的是是否存在当前配置的下一个配置,而不是查找最新的配置。
如果不这样做,可能会出现以下情况:
- 假设分区
S的旧 Replica Group 为G1,新 Replica Group 为G2。 - 发生配置变更,
G1发送S给G2,G2的响应丢失,G1没能成功删除S。 G2响应了对S的写入请求。- 配置再次发生变更,新 Replica Group 为
G3,此时G1,G2都发送S给G3,可能会出现G1直接更新到G3的情况。
不查询最新的配置可以避免出现上述情况。