Skip to content

Commit

Permalink
dragonboat: minor cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Jun 12, 2022
1 parent 4d02b36 commit 6ae1d0b
Show file tree
Hide file tree
Showing 46 changed files with 335 additions and 338 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
## About ##
Dragonboat is a high performance multi-group [Raft](https://raft.github.io/) [consensus](https://en.wikipedia.org/wiki/Consensus_(computer_science)) library in pure [Go](https://golang.org/).

Consensus algorithms such as Raft provides fault-tolerance by alllowing a system continue to operate as long as the majority member servers are available. For example, a Raft cluster of 5 servers can make progress even if 2 servers fail. It also appears to clients as a single entity with strong data consistency always provided. All Raft replicas can be used to handle read requests for aggregated read throughput.
Consensus algorithms such as Raft provides fault-tolerance by alllowing a system continue to operate as long as the majority member servers are available. For example, a Raft shard of 5 servers can make progress even if 2 servers fail. It also appears to clients as a single entity with strong data consistency always provided. All Raft replicas can be used to handle read requests for aggregated read throughput.

Dragonboat handles all technical difficulties associated with Raft to allow users to just focus on their application domains. It is also very easy to use, our step-by-step [examples](https://github.com/lni/dragonboat-example) can help new users to master it in half an hour.

Expand All @@ -25,7 +25,7 @@ Dragonboat handles all technical difficulties associated with Raft to allow user
* Fully pipelined and TLS mutual authentication support, ready for high latency open environment
* Custom Raft log storage and transport support, easy to integrate with latest I/O techs
* Prometheus based health metrics support
* Built-in tool to repair Raft clusters that permanently lost the quorum
* Built-in tool to repair Raft shards that permanently lost the quorum
* [Extensively tested](/docs/test.md) including using [Jepsen](https://aphyr.com/tags/jepsen)'s [Knossos](https://github.com/jepsen-io/knossos) linearizability checker, some results are [here](https://github.com/lni/knossos-data)

All major features covered in Diego Ongaro's [Raft thesis](https://github.com/ongardie/dissertation/blob/master/stanford.pdf) have been supported -
Expand Down
2 changes: 1 addition & 1 deletion benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func BenchmarkWorkerReady(b *testing.B) {
rc := newWorkReady(1)
b.RunParallel(func(pbt *testing.PB) {
for pbt.Next() {
rc.clusterReady(1)
rc.shardReady(1)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions client/session.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestNoOPSessionHasExpectedSeriesID(t *testing.T) {
t.Errorf("series id unexpected")
}
if cs.ShardID != 120 {
t.Errorf("cluster id unexpected")
t.Errorf("shard id unexpected")
}
}

Expand Down
18 changes: 9 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ const (

// Config is used to configure Raft nodes.
type Config struct {
// ReplicaID is a non-zero value used to identify a node within a Raft cluster.
// ReplicaID is a non-zero value used to identify a node within a Raft shard.
ReplicaID uint64
// ShardID is the unique value used to identify a Raft group that contains
// multiple replicas.
Expand Down Expand Up @@ -172,7 +172,7 @@ type Config struct {
DisableAutoCompactions bool
// IsNonVoting indicates whether this is a non-voting Raft node. Described as
// non-voting members in the section 4.2.1 of Diego Ongaro's thesis, they are
// used to allow a new node to join the cluster and catch up with other
// used to allow a new node to join the shard and catch up with other
// existing ndoes without impacting the availability. Extra non-voting nodes
// can also be introduced to serve read-only requests.
IsNonVoting bool
Expand All @@ -187,8 +187,8 @@ type Config struct {
//
// Witness support is currently experimental.
IsWitness bool
// Quiesce specifies whether to let the Raft cluster enter quiesce mode when
// there is no cluster activity. Shards in quiesce mode do not exchange
// Quiesce specifies whether to let the Raft shard enter quiesce mode when
// there is no shard activity. Shards in quiesce mode do not exchange
// heartbeat messages to minimize bandwidth consumption.
//
// Quiesce support is currently experimental.
Expand Down Expand Up @@ -369,11 +369,11 @@ type NodeHostConfig struct {
// is unlimited.
MaxReceiveQueueSize uint64
// MaxSnapshotSendBytesPerSecond defines how much snapshot data can be sent
// every second for all Raft clusters managed by the NodeHost instance.
// every second for all Raft shards managed by the NodeHost instance.
// The default value 0 means there is no limit set for snapshot streaming.
MaxSnapshotSendBytesPerSecond uint64
// MaxSnapshotRecvBytesPerSecond defines how much snapshot data can be
// received each second for all Raft clusters managed by the NodeHost instance.
// received each second for all Raft shards managed by the NodeHost instance.
// The default value 0 means there is no limit for receiving snapshot data.
MaxSnapshotRecvBytesPerSecond uint64
// NotifyCommit specifies whether clients should be notified when their
Expand Down Expand Up @@ -414,7 +414,7 @@ type NodeHostConfig struct {
// When starting Raft nodes or requesting new nodes to be added, use the above
// mentioned NodeHostID values as the target parameters (which are of the
// Target type). Let's say we want to start a Raft Node as a part of a three
// replicas Raft cluster, the initialMembers parameter of the StartShard
// replicas Raft shard, the initialMembers parameter of the StartShard
// method can be set to
//
// initialMembers := map[uint64]Target {
Expand All @@ -423,9 +423,9 @@ type NodeHostConfig struct {
// 3: "nhid-zzzzz",
// }
//
// This indicates that node 1 of the cluster will be running on the NodeHost
// This indicates that node 1 of the shard will be running on the NodeHost
// instance identified by the NodeHostID value "nhid-xxxxx", node 2 of the
// same cluster will be running on the NodeHost instance identified by the
// same shard will be running on the NodeHost instance identified by the
// NodeHostID value of "nhid-yyyyy" and so on.
//
// The internal gossip service exchanges NodeHost details, including their
Expand Down
6 changes: 3 additions & 3 deletions docs/devops.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# DevOps #

This document describes the DevOps requirements for operating Dragonboat based applications in production. Please note that incorrect DevOps operations can potentially corrupt your Raft clusters permanently.
This document describes the DevOps requirements for operating Dragonboat based applications in production. Please note that incorrect DevOps operations can potentially corrupt your Raft shards permanently.

* It is recommended to use the ext4 filesystem, other filesystems have never been tested.
* It is recommended to use enterprise NVME SSD with high write endurance rating. Must use local hard disks and avoid any NFS, CIFS, Samba, CEPH or other similar shared storage.
* Never try to backup or restore Dragonboat data by directly operating on Dragonboat data files or directories. It can immediately corrupt your Raft clusters.
* Never try to backup or restore Dragonboat data by directly operating on Dragonboat data files or directories. It can immediately corrupt your Raft shards.
* Each Raft group has multiple replicas, the best way to safeguard the availability of your services and data is to increase the number of replicas. As an example, the Raft group can tolerant 2 node failures when there are 5 replicas, while it can only tolerant 1 node failure when using 3 replicas.
* On node failure, the Raft group will be available when it still has the quorum. To handle such failures, you can add a non-voting node to start replicating data to it, once in sync with other replicas you can promote the non-voting node to a regular node and remove the failed node by using membership change APIs. For those failed nodes caused by intermittent failures such as short term network partition or power loss, you should resolve the network or power issue and try restarting the affected nodes.
* On disk failure, such as when experiencing data integrity check errors or write failures, it is important to immediately replace the failed disk and remove the failed node using the above described membership change method. To restart nodes with such disk failures, it is important to have the failed disk replaced first to ensure corrupted data is removed. As a refreshed node with no existing data, that node must be assigned a new RaftAddress value to avoid confusing other nodes.
* When the quorum nodes are gone, you will not be able to resolve it without losing data. The github.com/lni/dragonboat/tools package provides the ImportSnapshot method to import a previously exported snapshot to repair such failed Raft cluster.
* When the quorum nodes are gone, you will not be able to resolve it without losing data. The github.com/lni/dragonboat/tools package provides the ImportSnapshot method to import a previously exported snapshot to repair such failed Raft shard.
* By default, the RaftAddress value can not be changed between restarts, otherwise the system will panic with an error message.
* When you can't provide a static IP for your nodes, e.g. when IP is dynamically assigned on node restart, you may want to configure a static DNS name for that node and update it on each restart.
* When it is not possible to do so, you can choose to set the AddressByNodeHostID field to enable the gossip feature which is designed to handle dynamic RaftAddress. Check godocs for more details on the gossip feature.
Expand Down
4 changes: 2 additions & 2 deletions docs/overview.CHS.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ Leader:Raft协议中定义的扮演Leader角色的节点。每个Raft组应有

使用一个节点前首先需要启动该节点,使得其被NodeHost装载并管理。NodeHost的StartShard, StartConcurrentShard与StartOnDiskShard方法用于启动相应节点。

当一个Raft cluster的各初始成员首次启动时,用户需要提供该Raft cluster的所有初始成员信息(initial members),且各副本必须以完全相同的初始成员信息启动。该初始成员信息用于确保各副本从一个一致的成员列表开始演进后续用户要求的成员变更。当一个副本并非该Raft cluster的初始成员,而是后续通过成员变更(如SyncRequestAddNode)所新增的节点,其第一次启动时无需提供初始成员信息,只需要将join参数设置为true。
当一个Raft shard的各初始成员首次启动时,用户需要提供该Raft shard的所有初始成员信息(initial members),且各副本必须以完全相同的初始成员信息启动。该初始成员信息用于确保各副本从一个一致的成员列表开始演进后续用户要求的成员变更。当一个副本并非该Raft shard的初始成员,而是后续通过成员变更(如SyncRequestAddNode)所新增的节点,其第一次启动时无需提供初始成员信息,只需要将join参数设置为true。

当一个节点重启时,不论该节点是一个初始节点还是后续通过成员变更添加的节点,均无需再次提供初始成员信息,也不再需要设置join参数为true。

## 节点停止 ##

用户可以通过NodeHost的StopShard方法来停止所指定的Raft cluster在该NodeHost管理下的副本。停止后的节点不再响应读写请求,但可以通过上述节点启动方式再次重新启动。
用户可以通过NodeHost的StopShard方法来停止所指定的Raft shard在该NodeHost管理下的副本。停止后的节点不再响应读写请求,但可以通过上述节点启动方式再次重新启动。

在一个副本被StopShard要求停止后,如果它正在执行快照的创建或恢复,该节点可能不会立刻停止而需等待至快照的创建或恢复完成。为避免这种长期等待,由用户实现的快照创建与恢复方法提供了一个<-chan struct{}的参数,当节点被要求停止后,该<-chan struct{}会被关闭,用户的快照创建与恢复方法可据此选择是否放弃当前的快照创建与恢复,从而快速响应节点停止的请求。

Expand Down
12 changes: 6 additions & 6 deletions docs/test.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@
## Monkey Testing ##
### Setup ###
* 5 NodeNosts and 3 Drummer servers per process
* hundreds of Raft clusters per process
* hundreds of Raft shards per process
* randomly kill and restart NodeHosts and Drummer servers, each NodeHost usually stay online for a few minutes
* randomly delete all data owned by a certain NodeHost to emulate permanent disk failure
* randomly drop and re-order messages exchanged between NodeHosts
* randomly partition NodeHosts from rest of the network
* for selected instances, snapshotting and log compaction happen all the time in the background
* committed entries are applied with random delays
* snapshots are captured and applied with random delays
* a list of background workers keep writing to/reading from random Raft clusters with stale read checks
* a list of background workers keep writing to/reading from random Raft shards with stale read checks
* client activity history files are verified by linearizability checkers such as Jepsen's Knossos
* run hundreds of above described processes concurrently on each test server, 30 minutes each iteration, many iterations every night
* run concurrently on many servers every night

### Checks ###
* no linearizability violation
* no cluster is permanently stuck
* no shard is permanently stuck
* state machines must be in sync
* cluster membership must be consistent
* shard membership must be consistent
* raft log saved in LogDB must be consistent
* no zombie cluster node
* no zombie shard node

### Results ###
Some history files in Jepsen's [Knossos](https://github.com/jepsen-io/knossos) edn format have been made publicly [available](https://github.com/lni/knossos-data).
Expand All @@ -44,7 +44,7 @@ Some history files in Jepsen's [Knossos](https://github.com/jepsen-io/knossos) e
* Ubuntu 16.04 with Spectre and Meltdown patches, ext4 file-system

## Benchmark method ##
* 48 Raft clusters on three NodeHost instances across three servers
* 48 Raft shards on three NodeHost instances across three servers
* Each Raft node is backed by a in-memory Key-Value data store as RSM
* Mostly update operations in the Key-Value store
* All I/O requests are launched from local processes
Expand Down
46 changes: 23 additions & 23 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (wr *workReady) notify(idx uint64) {
}
}

func (wr *workReady) clusterReadyByUpdates(updates []pb.Update) {
func (wr *workReady) shardReadyByUpdates(updates []pb.Update) {
var notified bitmap
for _, ud := range updates {
if len(ud.CommittedEntries) > 0 {
Expand All @@ -177,7 +177,7 @@ func (wr *workReady) clusterReadyByUpdates(updates []pb.Update) {
}
}

func (wr *workReady) clusterReadyByMessageBatch(mb pb.MessageBatch) {
func (wr *workReady) shardReadyByMessageBatch(mb pb.MessageBatch) {
var notified bitmap
for _, req := range mb.Requests {
idx := wr.partitioner.GetPartitionID(req.ShardID)
Expand Down Expand Up @@ -209,7 +209,7 @@ func (wr *workReady) allShardsReady(nodes []*node) {
}
}

func (wr *workReady) clusterReady(shardID uint64) {
func (wr *workReady) shardReady(shardID uint64) {
idx := wr.partitioner.GetPartitionID(shardID)
readyMap := wr.maps[idx]
readyMap.setShardReady(shardID)
Expand Down Expand Up @@ -421,29 +421,29 @@ func (p *workerPool) workerPoolMain() {
p.unloadNodes()
return
} else if chosen == 1 {
clusters := p.saveReady.getReadyMap(1)
shards := p.saveReady.getReadyMap(1)
p.loadNodes()
for cid := range clusters {
for cid := range shards {
if j, ok := p.getSaveJob(cid); ok {
plog.Debugf("%s saveRequested for %d", p.nh.describe(), cid)
p.pending = append(p.pending, j)
toSchedule = true
}
}
} else if chosen == 2 {
clusters := p.recoverReady.getReadyMap(1)
shards := p.recoverReady.getReadyMap(1)
p.loadNodes()
for cid := range clusters {
for cid := range shards {
if j, ok := p.getRecoverJob(cid); ok {
plog.Debugf("%s recoverRequested for %d", p.nh.describe(), cid)
p.pending = append(p.pending, j)
toSchedule = true
}
}
} else if chosen == 3 {
clusters := p.streamReady.getReadyMap(1)
shards := p.streamReady.getReadyMap(1)
p.loadNodes()
for cid := range clusters {
for cid := range shards {
if j, ok := p.getStreamJob(cid); ok {
plog.Debugf("%s streamRequested for %d", p.nh.describe(), cid)
p.pending = append(p.pending, j)
Expand Down Expand Up @@ -917,7 +917,7 @@ func (p *closeWorkerPool) completed(workerID uint64) {
plog.Panicf("close worker %d is not in busy state", workerID)
}
if _, ok := p.processing[shardID]; !ok {
plog.Panicf("cluster %d is not being processed", shardID)
plog.Panicf("shard %d is not being processed", shardID)
}
delete(p.processing, shardID)
delete(p.busy, workerID)
Expand Down Expand Up @@ -1420,50 +1420,50 @@ func (e *engine) setCloseReady(n *node) {
}

func (e *engine) setStepReadyByMessageBatch(mb pb.MessageBatch) {
e.stepWorkReady.clusterReadyByMessageBatch(mb)
e.stepWorkReady.shardReadyByMessageBatch(mb)
}

func (e *engine) setAllStepReady(nodes []*node) {
e.stepWorkReady.allShardsReady(nodes)
}

func (e *engine) setStepReady(shardID uint64) {
e.stepWorkReady.clusterReady(shardID)
e.stepWorkReady.shardReady(shardID)
}

func (e *engine) setCommitReadyByUpdates(updates []pb.Update) {
e.commitWorkReady.clusterReadyByUpdates(updates)
e.commitWorkReady.shardReadyByUpdates(updates)
}

func (e *engine) setCommitReady(shardID uint64) {
e.commitWorkReady.clusterReady(shardID)
e.commitWorkReady.shardReady(shardID)
}

func (e *engine) setApplyReadyByUpdates(updates []pb.Update) {
e.applyWorkReady.clusterReadyByUpdates(updates)
e.applyWorkReady.shardReadyByUpdates(updates)
}

func (e *engine) setApplyReady(shardID uint64) {
e.applyWorkReady.clusterReady(shardID)
e.applyWorkReady.shardReady(shardID)
}

func (e *engine) setStreamReady(shardID uint64) {
e.wp.streamReady.clusterReady(shardID)
e.wp.streamReady.shardReady(shardID)
}

func (e *engine) setSaveReady(shardID uint64) {
e.wp.saveReady.clusterReady(shardID)
e.wp.saveReady.shardReady(shardID)
}

func (e *engine) setRecoverReady(shardID uint64) {
e.wp.recoverReady.clusterReady(shardID)
e.wp.recoverReady.shardReady(shardID)
}

func (e *engine) setCCIReady(shardID uint64) {
e.stepCCIReady.clusterReady(shardID)
e.commitCCIReady.clusterReady(shardID)
e.applyCCIReady.clusterReady(shardID)
e.wp.cciReady.clusterReady(shardID)
e.stepCCIReady.shardReady(shardID)
e.commitCCIReady.shardReady(shardID)
e.applyCCIReady.shardReady(shardID)
e.wp.cciReady.shardReady(shardID)
}

func (e *engine) offloadNodeMap(nodes map[uint64]*node) {
Expand Down
Loading

0 comments on commit 6ae1d0b

Please sign in to comment.