Skip to content

Commit

Permalink
fix correctness issues arround node reset
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Jan 17, 2024
1 parent 812432a commit cc12036
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 26 deletions.
10 changes: 5 additions & 5 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ impl Chitchat {
}

pub(crate) fn create_syn_message(&self) -> ChitchatMessage {
let dead_nodes: HashSet<_> = self.dead_nodes().collect();
let digest = self.compute_digest(&dead_nodes);
let digest = self.compute_digest();
ChitchatMessage::Syn {
cluster_id: self.config.cluster_id.clone(),
digest,
Expand All @@ -107,7 +106,7 @@ impl Chitchat {
}
// Ensure for every reply from this node, at least the heartbeat is changed.
let dead_nodes: HashSet<_> = self.dead_nodes().collect();
let self_digest = self.compute_digest(&dead_nodes);
let self_digest = self.compute_digest();
let empty_delta = Delta::default();
let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE
- syn_ack_serialized_len(&self_digest, &empty_delta);
Expand Down Expand Up @@ -168,6 +167,7 @@ impl Chitchat {
}
} else {
self.failure_detector.report_unknown(chitchat_id);
self.failure_detector.update_node_liveness(chitchat_id);
}
}
}
Expand Down Expand Up @@ -280,8 +280,8 @@ impl Chitchat {
}

/// Computes the node's digest.
fn compute_digest(&self, dead_nodes: &HashSet<&ChitchatId>) -> Digest {
self.cluster_state.compute_digest(dead_nodes)
fn compute_digest(&self) -> Digest {
self.cluster_state.compute_digest()
}

/// Subscribes a callback that will be called every time a key matching the supplied prefix
Expand Down
10 changes: 5 additions & 5 deletions chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,11 @@ mod tests {
test_transport.send(server_addr, syn_ack).await.unwrap();

// Wait for delta to ensure heartbeat key was incremented.
let (_, chitchat_message) = timeout(test_transport.recv()).await.unwrap();
let delta = if let ChitchatMessage::Ack { delta } = chitchat_message {
delta
} else {
panic!("Expected ack");
let delta = loop {
let (_, chitchat_message) = timeout(test_transport.recv()).await.unwrap();
if let ChitchatMessage::Ack { delta } = chitchat_message {
break delta;
};
};

let node_delta = &delta.node_deltas.get(&server_id).unwrap();
Expand Down
37 changes: 22 additions & 15 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,11 @@ impl ClusterState {
}
}

pub fn compute_digest(&self, dead_nodes: &HashSet<&ChitchatId>) -> Digest {
pub fn compute_digest(&self) -> Digest {
Digest {
node_digests: self
.node_states
.iter()
.filter(|(chitchat_id, _)| !dead_nodes.contains(chitchat_id))
.map(|(chitchat_id, node_state)| (chitchat_id.clone(), node_state.digest()))
.collect(),
}
Expand Down Expand Up @@ -366,21 +365,39 @@ impl ClusterState {
}
let mut delta_writer = DeltaWriter::with_mtu(mtu);

for chitchat_id in nodes_to_reset {
if !delta_writer.add_node_to_reset(chitchat_id.clone()) {
for chitchat_id in &nodes_to_reset {
if !delta_writer.add_node_to_reset((*chitchat_id).clone()) {
break;
}
}
for stale_node in stale_nodes.into_iter() {
if !delta_writer.add_node(stale_node.chitchat_id.clone(), stale_node.heartbeat) {
break;
}
let mut added_something = false;
for (key, versioned_value) in stale_node.stale_key_values() {
added_something = true;
if !delta_writer.add_kv(key, versioned_value.clone()) {
let delta: Delta = delta_writer.into();
return delta;
}
}
if !added_something && nodes_to_reset.contains(&stale_node.chitchat_id) {
// send a sentinel element to update the max_version. Otherwise the node's vision
// of max_version will be 0, and it may accept writes that are supposed to be
// stale, but it can tell they are.
if !delta_writer.add_kv(
"",
VersionedValue {
value: String::new(),
version: stale_node.node_state.max_version,
tombstone: Some(0),
},
) {
let delta: Delta = delta_writer.into();
return delta;
}
}
}
delta_writer.into()
}
Expand Down Expand Up @@ -817,23 +834,13 @@ mod tests {
node2_state.set("key_a", "");
node2_state.set("key_b", "");

let dead_nodes = HashSet::new();
let digest = cluster_state.compute_digest(&dead_nodes);
let digest = cluster_state.compute_digest();

let mut expected_node_digests = Digest::default();
expected_node_digests.add_node(node1.clone(), Heartbeat(0), 1);
expected_node_digests.add_node(node2.clone(), Heartbeat(0), 2);

assert_eq!(&digest, &expected_node_digests);

// Consider node 1 dead:
let dead_nodes = HashSet::from_iter([&node1]);
let digest = cluster_state.compute_digest(&dead_nodes);

let mut expected_node_digests = Digest::default();
expected_node_digests.add_node(node2.clone(), Heartbeat(0), 2);

assert_eq!(&digest, &expected_node_digests);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion chitchat/tests/cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() {
},
// Relink node 3
Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_3.clone()),
Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_2.clone()),
Operation::AddNetworkLink(chitchat_id_2.clone(), chitchat_id_3.clone()),
Operation::NodeStateAssert {
server_chitchat_id: chitchat_id_3.clone(),
chitchat_id: chitchat_id_1.clone(),
Expand Down

0 comments on commit cc12036

Please sign in to comment.