Skip to content

Commit

Permalink
Use thread safe datastructures for performance tracking (Consensys#4186)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton committed Jul 27, 2021
1 parent 77bd3a2 commit 6f60c52
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ For information on changes in released versions of Teku, see the [releases page]
- Fix an issue where deposits for the PoW chain could be loaded out of order on restart.
- Fix `/eth/v1/validator/contribution_and_proofs` to return errors.
- Add `SYNC_COMMITTEE_SUBNET_COUNT` to `/eth/v1/config/spec`, as it was missing.
- Fixed `ConcurrentModificationException` in validator performance reporting.
- Upgraded the discovery library, providing better memory management and standards compliance.

### Experimental: New Altair REST APIs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IntSummaryStatistics;
Expand All @@ -27,6 +28,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -49,11 +52,14 @@
public class DefaultPerformanceTracker implements PerformanceTracker {

@VisibleForTesting
final NavigableMap<UInt64, Set<SlotAndBlockRoot>> producedBlocksByEpoch = new TreeMap<>();
final NavigableMap<UInt64, Set<SlotAndBlockRoot>> producedBlocksByEpoch =
new ConcurrentSkipListMap<>();

final NavigableMap<UInt64, Set<Attestation>> producedAttestationsByEpoch = new TreeMap<>();
final NavigableMap<UInt64, Set<Attestation>> producedAttestationsByEpoch =
new ConcurrentSkipListMap<>();

final NavigableMap<UInt64, AtomicInteger> blockProductionAttemptsByEpoch = new TreeMap<>();
final NavigableMap<UInt64, AtomicInteger> blockProductionAttemptsByEpoch =
new ConcurrentSkipListMap<>();

@VisibleForTesting
static final UInt64 BLOCK_PERFORMANCE_EVALUATION_INTERVAL = UInt64.valueOf(2); // epochs
Expand Down Expand Up @@ -225,7 +231,7 @@ private AttestationPerformance getAttestationPerformanceForEpoch(

// Get sent attestations in range
Set<Attestation> producedAttestations =
producedAttestationsByEpoch.getOrDefault(analyzedEpoch, new HashSet<>());
producedAttestationsByEpoch.getOrDefault(analyzedEpoch, Collections.emptySet());
BeaconState state = combinedChainDataClient.getBestState().orElseThrow();

int correctTargetCount = 0;
Expand Down Expand Up @@ -327,15 +333,15 @@ private Map<UInt64, List<Attestation>> getAttestationsIncludedInEpochs(
public void saveProducedAttestation(Attestation attestation) {
UInt64 epoch = spec.computeEpochAtSlot(attestation.getData().getSlot());
Set<Attestation> attestationsInEpoch =
producedAttestationsByEpoch.computeIfAbsent(epoch, __ -> new HashSet<>());
producedAttestationsByEpoch.computeIfAbsent(epoch, __ -> concurrentSet());
attestationsInEpoch.add(attestation);
}

@Override
public void saveProducedBlock(SignedBeaconBlock block) {
UInt64 epoch = spec.computeEpochAtSlot(block.getSlot());
Set<SlotAndBlockRoot> blocksInEpoch =
producedBlocksByEpoch.computeIfAbsent(epoch, __ -> new HashSet<>());
producedBlocksByEpoch.computeIfAbsent(epoch, __ -> concurrentSet());
blocksInEpoch.add(new SlotAndBlockRoot(block.getSlot(), block.getRoot()));
}

Expand Down Expand Up @@ -363,4 +369,8 @@ public void saveProducedSyncCommitteeMessage(final SyncCommitteeMessage message)
static long getPercentage(final long numerator, final long denominator) {
return (long) (numerator * 100.0 / denominator + 0.5);
}

private <T> Set<T> concurrentSet() {
return Collections.newSetFromMap(new ConcurrentHashMap<>());
}
}

0 comments on commit 6f60c52

Please sign in to comment.