Skip to content

Commit

Permalink
Improve sync committee gossip logging (#6640)
Browse files Browse the repository at this point in the history
Makes failures to gossip more visible, using the same throttling as applied to attestations.
  • Loading branch information
ajsutton committed Dec 22, 2022
1 parent ae33651 commit 8570b5f
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ For information on changes in released versions of Teku, see the [releases page]
- Added `finalized` metadata field to applicable REST API responses
- Use SSZ encoding for external validator client block creation requests by default. Can be disabled with `--beacon-node-ssz-blocks-enabled=false`.
- Added a timeout (2 minutes) when attempting to load the initial state from a URL
- Improved logging when sync committee messages fail to publish to the gossip network
- Support for the `/eth/v1/beacon/deposit_snapshot` REST API

### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public void assertMessagesInOrder(final Level level, final String... messages) {
assertThat(getMessages(level)).containsSubsequence(messages);
}

public void assertDebugLog(final String message) {
assertLogged(Level.DEBUG, message);
}

public void assertInfoLog(final String message) {
assertLogged(Level.INFO, message);
}
Expand Down Expand Up @@ -99,6 +103,10 @@ public static LogCaptor forClass(final Class<?> clazz) {
return new LogCaptor(loggerConfig, appender);
}

public void clearLogs() {
appender.logs.clear();
}

@Override
public void close() {
loggerConfig.removeAppender(appender.getName());
Expand Down
1 change: 1 addition & 0 deletions networking/eth2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
testImplementation project(':ethereum:networks')
testImplementation testFixtures(project(':infrastructure:async'))
testImplementation testFixtures(project(':infrastructure:collections'))
testImplementation testFixtures(project(':infrastructure:logging'))
testImplementation testFixtures(project(':infrastructure:time'))
testImplementation testFixtures(project(':networking:p2p'))
testImplementation testFixtures(project(':storage'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,15 @@

package tech.pegasys.teku.networking.eth2.gossip;

import com.google.common.base.Throwables;
import io.libp2p.pubsub.MessageAlreadySeenException;
import io.libp2p.pubsub.NoPeersForOutboundMessageException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationSubnetSubscriptions;
import tech.pegasys.teku.spec.datastructures.attestation.ValidateableAttestation;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;

public class AttestationGossipManager implements GossipManager {
private static final Logger LOG = LogManager.getLogger();
Expand All @@ -37,8 +31,7 @@ public class AttestationGossipManager implements GossipManager {
private final Counter attestationPublishSuccessCounter;
private final Counter attestationPublishFailureCounter;

private UInt64 lastErroredSlot;
private Throwable lastRootCause;
private final GossipFailureLogger gossipFailureLogger = new GossipFailureLogger("attestation");

public AttestationGossipManager(
final MetricsSystem metricsSystem,
Expand Down Expand Up @@ -69,40 +62,11 @@ public void onNewAttestation(final ValidateableAttestation validateableAttestati
attestationPublishSuccessCounter.inc();
},
error -> {
logWithSuppression(error, attestation);
gossipFailureLogger.logWithSuppression(error, attestation.getData().getSlot());
attestationPublishFailureCounter.inc();
});
}

synchronized void logWithSuppression(final Throwable error, final Attestation attestation) {
final AttestationData attestationData = attestation.getData();
final Throwable rootCause = Throwables.getRootCause(error);

final boolean suppress =
attestationData.getSlot().equals(lastErroredSlot)
&& rootCause.getClass().equals(lastRootCause.getClass());

lastErroredSlot = attestationData.getSlot();
lastRootCause = rootCause;

if (lastRootCause instanceof MessageAlreadySeenException) {
LOG.debug(
"Failed to publish attestation(s) for slot {} because the message has already been seen",
lastErroredSlot);
} else if (lastRootCause instanceof NoPeersForOutboundMessageException) {
LOG.log(
suppress ? Level.DEBUG : Level.WARN,
"Failed to publish attestations(s) for slot {} because no peers were available on the required gossip topic",
lastErroredSlot);
} else {
LOG.log(
suppress ? Level.DEBUG : Level.ERROR,
"Failed to publish attestation(s) for slot {}",
lastErroredSlot,
error);
}
}

public void subscribeToSubnetId(final int subnetId) {
LOG.trace("Subscribing to subnet ID {}", subnetId);
subnetSubscriptions.subscribeToSubnetId(subnetId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright ConsenSys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.networking.eth2.gossip;

import com.google.common.base.Throwables;
import io.libp2p.pubsub.MessageAlreadySeenException;
import io.libp2p.pubsub.NoPeersForOutboundMessageException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public class GossipFailureLogger {
private static final Logger LOG = LogManager.getLogger();

private final String messageType;
private UInt64 lastErroredSlot;
private Throwable lastRootCause;

public GossipFailureLogger(final String messageType) {
this.messageType = messageType;
}

public synchronized void logWithSuppression(final Throwable error, final UInt64 slot) {
final Throwable rootCause = Throwables.getRootCause(error);

final boolean suppress =
slot.equals(lastErroredSlot) && rootCause.getClass().equals(lastRootCause.getClass());

lastErroredSlot = slot;
lastRootCause = rootCause;

if (lastRootCause instanceof MessageAlreadySeenException) {
LOG.debug(
"Failed to publish {}(s) for slot {} because the message has already been seen",
messageType,
lastErroredSlot);
} else if (lastRootCause instanceof NoPeersForOutboundMessageException) {
LOG.log(
suppress ? Level.DEBUG : Level.WARN,
"Failed to publish {}(s) for slot {} because no peers were available on the required gossip topic",
messageType,
lastErroredSlot);
} else {
LOG.log(
suppress ? Level.DEBUG : Level.ERROR,
"Failed to publish {}(s) for slot {}",
messageType,
lastErroredSlot,
error);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class SyncCommitteeMessageGossipManager implements GossipManager {
private final Counter publishSuccessCounter;
private final Counter publishFailureCounter;

private final GossipFailureLogger gossipFailureLogger =
new GossipFailureLogger("sync committee message");

public SyncCommitteeMessageGossipManager(
final MetricsSystem metricsSystem,
final Spec spec,
Expand Down Expand Up @@ -106,8 +109,7 @@ private void publish(final SyncCommitteeMessage message, final int subnetId) {
publishSuccessCounter.inc();
},
error -> {
LOG.trace(
"Failed to publish sync committee message for slot {}", message.getSlot(), error);
gossipFailureLogger.logWithSuppression(error, message.getSlot());
publishFailureCounter.inc();
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright ConsenSys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.networking.eth2.gossip;

import io.libp2p.pubsub.MessageAlreadySeenException;
import io.libp2p.pubsub.NoPeersForOutboundMessageException;
import org.junit.jupiter.api.Test;
import tech.pegasys.infrastructure.logging.LogCaptor;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

class GossipFailureLoggerTest {

public static final String ALREADY_SEEN_MESSAGE =
"Failed to publish thingy(s) for slot 1 because the message has already been seen";
public static final UInt64 SLOT = UInt64.ONE;
public static final String NO_PEERS_MESSAGE = noPeersMessage(SLOT.intValue());

public static final String GENERIC_FAILURE_MESSAGE = "Failed to publish thingy(s) for slot 1";

private final GossipFailureLogger logger = new GossipFailureLogger("thingy");

@Test
void shouldLogAlreadySeenErrorsAtDebugLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new MessageAlreadySeenException("Dupe")), SLOT);
logCaptor.assertDebugLog(ALREADY_SEEN_MESSAGE);
}
}

@Test
void shouldLogFirstNoPeersErrorsAtWarningLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new NoPeersForOutboundMessageException("So Lonely")), SLOT);
logCaptor.assertWarnLog(NO_PEERS_MESSAGE);
}
}

@Test
void shouldLogRepeatedNoPeersErrorsAtDebugLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new NoPeersForOutboundMessageException("So Lonely")), SLOT);
logCaptor.clearLogs();

logger.logWithSuppression(
new IllegalStateException(
"Foo", new NoPeersForOutboundMessageException("Not a friend in the world")),
SLOT);
logCaptor.assertDebugLog(NO_PEERS_MESSAGE);
}
}

@Test
void shouldLogNoPeersErrorsWithDifferentSlotsAtWarnLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new NoPeersForOutboundMessageException("So Lonely")), SLOT);
logCaptor.assertWarnLog(NO_PEERS_MESSAGE);

logger.logWithSuppression(
new IllegalStateException(
"Foo", new NoPeersForOutboundMessageException("Not a friend in the world")),
UInt64.valueOf(2));
logCaptor.assertWarnLog(noPeersMessage(2));
}
}

@Test
void shouldLogNoPeersErrorsAtWarnLevelWhenSeparatedByADifferentException() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new NoPeersForOutboundMessageException("So Lonely")), SLOT);
logCaptor.assertWarnLog(NO_PEERS_MESSAGE);
logCaptor.clearLogs();

logger.logWithSuppression(new MessageAlreadySeenException("Dupe"), SLOT);

logger.logWithSuppression(
new IllegalStateException(
"Foo", new NoPeersForOutboundMessageException("Not a friend in the world")),
SLOT);
logCaptor.assertWarnLog(NO_PEERS_MESSAGE);
}
}

@Test
void shouldLogFirstGenericErrorAtErrorLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new IllegalStateException("Boom")), SLOT);
logCaptor.assertErrorLog(GENERIC_FAILURE_MESSAGE);
}
}

@Test
void shouldLogRepeatedGenericErrorsAtDebugLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new IllegalStateException("Boom")), SLOT);
logCaptor.clearLogs();

logger.logWithSuppression(
new IllegalStateException("Foo", new IllegalStateException("goes the dynamite")), SLOT);
logCaptor.assertDebugLog(GENERIC_FAILURE_MESSAGE);
}
}

@Test
void shouldLogMultipleGenericErrorsWithDifferentCausesAtErrorLevel() {
try (final LogCaptor logCaptor = LogCaptor.forClass(GossipFailureLogger.class)) {
logger.logWithSuppression(
new RuntimeException("Foo", new IllegalStateException("Boom")), SLOT);
logCaptor.assertErrorLog(GENERIC_FAILURE_MESSAGE);
logCaptor.clearLogs();

logger.logWithSuppression(
new IllegalStateException("Foo", new IllegalArgumentException("goes the dynamite")),
SLOT);
logCaptor.assertErrorLog(GENERIC_FAILURE_MESSAGE);
}
}

private static String noPeersMessage(final int slot) {
return "Failed to publish thingy(s) for slot "
+ slot
+ " because no peers were available on the required gossip topic";
}
}

0 comments on commit 8570b5f

Please sign in to comment.