Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve execution payload retrying #5941

Merged
merged 7 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Replace ReexecutingExecutionPayloadBlockManager with FailedExecutionP…
…ool.

Only retries a single block at a time and retries less often after repeated failures.
  • Loading branch information
ajsutton committed Jul 18, 2022
commit 09552ad0a325f1f7756267f695fd01c20fa80de3
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class BlockManager extends Service
LimitedMap.createSynchronized(500);
private final Subscribers<ImportedBlockListener> receivedBlockSubscribers =
Subscribers.create(true);
private final Subscribers<FailedPayloadExecutionSubscriber> failedPayloadExecutionSubscribers =
Subscribers.create(true);

private final Optional<BlockImportMetrics> blockImportMetrics;

Expand Down Expand Up @@ -146,6 +148,10 @@ private void notifyReceivedBlockSubscribers(
s -> s.onBlockImported(signedBeaconBlock, executionOptimistic));
}

public void subscribeFailedPayloadExecution(final FailedPayloadExecutionSubscriber subscriber) {
failedPayloadExecutionSubscribers.subscribe(subscriber);
}

@Override
public void onBlockImported(final SignedBeaconBlock block) {
// Check if any pending blocks can now be imported
Expand Down Expand Up @@ -240,11 +246,15 @@ private SafeFuture<BlockImportResult> handleBlockImport(
break;
case FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING:
LOG.warn("Unable to import block: Execution Client is still syncing");
failedPayloadExecutionSubscribers.deliver(
FailedPayloadExecutionSubscriber::onPayloadExecutionFailed, block);
break;
case FAILED_EXECUTION_PAYLOAD_EXECUTION:
LOG.error(
"Unable to import block: Execution Client communication error.",
result.getFailureCause().orElse(null));
failedPayloadExecutionSubscribers.deliver(
FailedPayloadExecutionSubscriber::onPayloadExecutionFailed, block);
break;
default:
LOG.trace(
Expand Down Expand Up @@ -283,4 +293,8 @@ private void lateBlockImportCheck(
blockImportPerformance ->
blockImportPerformance.processingComplete(eventLogger, block, blockImportResult));
}

public interface FailedPayloadExecutionSubscriber {
void onPayloadExecutionFailed(SignedBeaconBlock block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public FailedExecutionPool(final BlockManager blockManager, final AsyncRunner as
this.asyncRunner = asyncRunner;
}

public synchronized void addFailedBlock(
final SignedBeaconBlock block, final BlockImportResult importResult) {
public synchronized void addFailedBlock(final SignedBeaconBlock block) {
if (retryingBlock.isEmpty()) {
retryingBlock = Optional.of(block);
scheduleNextRetry(block);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class FailedExecutionPoolTest {
@Test
void shouldRetryExecutionAfterShortDelay() {
withImportResult(FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);

assertThat(asyncRunner.hasDelayedActions()).isTrue();

Expand All @@ -69,7 +69,7 @@ void shouldRetryExecutionAfterShortDelay() {
void shouldContinueRetryingWhenExecutionFailsAgain() {
withImportResult(FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);

failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);

assertThat(asyncRunner.hasDelayedActions()).isTrue();
asyncRunner.executeQueuedActions();
Expand All @@ -84,7 +84,7 @@ void shouldContinueRetryingWhenExecutionFailsAgain() {
void shouldStopRetryingWhenBlockImports() {
withImportResult(BlockImportResult.successful(block));

failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);

assertThat(asyncRunner.hasDelayedActions()).isTrue();
asyncRunner.executeQueuedActions();
Expand All @@ -98,7 +98,7 @@ void shouldStopRetryingWhenPayloadFoundToBeInvalid() {
withImportResult(
BlockImportResult.failedStateTransition(new IllegalStateException("Invalid payload")));

failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);

assertThat(asyncRunner.hasDelayedActions()).isTrue();
asyncRunner.executeQueuedActions();
Expand All @@ -111,7 +111,7 @@ void shouldStopRetryingWhenPayloadFoundToBeInvalid() {
void shouldStopRetryingWhenBlockCanBeOptimisticallyImported() {
withImportResult(BlockImportResult.optimisticallySuccessful(block));

failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);

assertThat(asyncRunner.hasDelayedActions()).isTrue();
asyncRunner.executeQueuedActions();
Expand All @@ -124,7 +124,7 @@ void shouldStopRetryingWhenBlockCanBeOptimisticallyImported() {
void shouldWaitLongerBetweenEachRetry() {
withImportResult(FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);

failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);

timeProvider.advanceTimeBy(FailedExecutionPool.SHORT_DELAY);
asyncRunner.executeDueActions();
Expand All @@ -146,7 +146,7 @@ void shouldResetRetryTimeOnSuccessfulResponse() {
final SignedBeaconBlock block2 = dataStructureUtil.randomSignedBeaconBlock(2);
withImportResult(FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);

failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);

timeProvider.advanceTimeBy(FailedExecutionPool.SHORT_DELAY);
asyncRunner.executeDueActions();
Expand All @@ -159,7 +159,7 @@ void shouldResetRetryTimeOnSuccessfulResponse() {
verify(blockManager, times(2)).importBlock(block);

// New block fails and should be retried with a short timeout again
failurePool.addFailedBlock(block2, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block2);
timeProvider.advanceTimeBy(FailedExecutionPool.SHORT_DELAY);
asyncRunner.executeDueActions();
verify(blockManager).importBlock(block2);
Expand All @@ -170,8 +170,8 @@ void shouldOnlyRetryOneBlockAtATime() {
withImportResult(FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
final SignedBeaconBlock block2 = dataStructureUtil.randomSignedBeaconBlock(2);

failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block2, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);
failurePool.addFailedBlock(block2);

asyncRunner.executeQueuedActions();
verify(blockManager, times(1)).importBlock(any());
Expand All @@ -182,8 +182,8 @@ void shouldRetryNextBlockAfterFirstOneNoLongerNeedsRetrying() {
withImportResult(BlockImportResult.optimisticallySuccessful(block));
final SignedBeaconBlock block2 = dataStructureUtil.randomSignedBeaconBlock(2);

failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block2, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);
failurePool.addFailedBlock(block2);

asyncRunner.executeQueuedActions();
verify(blockManager).importBlock(block);
Expand All @@ -194,7 +194,7 @@ void shouldRetryNextBlockAfterFirstOneNoLongerNeedsRetrying() {
@Test
void shouldLimitMaximumRetryDelayForSyncingResponses() {
withImportResult(FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);

Duration expectedDelay = FailedExecutionPool.SHORT_DELAY;
for (int i = 0; i < 5; i++) {
Expand All @@ -213,7 +213,7 @@ void shouldLimitMaximumRetryDelayForSyncingResponses() {
@Test
void shouldLimitMaximumRetryDelayForTimeoutResponses() {
withImportResult(FAILED_EXECUTION_TIMEOUT);
failurePool.addFailedBlock(block, FAILED_EXECUTION_TIMEOUT);
failurePool.addFailedBlock(block);

Duration expectedDelay = FailedExecutionPool.SHORT_DELAY;
for (int i = 0; i < 5; i++) {
Expand All @@ -232,7 +232,7 @@ void shouldLimitMaximumRetryDelayForTimeoutResponses() {
@Test
void shouldLimitMaximumRetryDelayForFailureResponses() {
withImportResult(FAILED_EXECUTION_ERROR);
failurePool.addFailedBlock(block, FAILED_EXECUTION_ERROR);
failurePool.addFailedBlock(block);

Duration expectedDelay = FailedExecutionPool.SHORT_DELAY;
for (int i = 0; i < 5; i++) {
Expand All @@ -253,8 +253,8 @@ void shouldRetrySameBlockOnTimeoutResponse() {
withImportResult(FAILED_EXECUTION_TIMEOUT);
final SignedBeaconBlock block2 = dataStructureUtil.randomSignedBeaconBlock(2);

failurePool.addFailedBlock(block, FAILED_EXECUTION_TIMEOUT);
failurePool.addFailedBlock(block2, FAILED_EXECUTION_TIMEOUT);
failurePool.addFailedBlock(block);
failurePool.addFailedBlock(block2);

asyncRunner.executeQueuedActions();
verify(blockManager).importBlock(block);
Expand All @@ -268,8 +268,8 @@ void shouldRetryDifferentBlockOnSyncingResponse() {
withImportResult(FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
final SignedBeaconBlock block2 = dataStructureUtil.randomSignedBeaconBlock(2);

failurePool.addFailedBlock(block, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block2, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING);
failurePool.addFailedBlock(block);
failurePool.addFailedBlock(block2);

asyncRunner.executeQueuedActions();
verify(blockManager).importBlock(block);
Expand All @@ -283,8 +283,8 @@ void shouldRetryDifferentBlockOnFailureResponse() {
withImportResult(FAILED_EXECUTION_ERROR);
final SignedBeaconBlock block2 = dataStructureUtil.randomSignedBeaconBlock(2);

failurePool.addFailedBlock(block, FAILED_EXECUTION_ERROR);
failurePool.addFailedBlock(block2, FAILED_EXECUTION_ERROR);
failurePool.addFailedBlock(block);
failurePool.addFailedBlock(block2);

asyncRunner.executeQueuedActions();
verify(blockManager).importBlock(block);
Expand All @@ -300,8 +300,8 @@ void shouldStopRetryingBlockWhenImportThrowsExceptionInsteadOfReturningFailedFut
when(blockManager.importBlock(block2))
.thenReturn(SafeFuture.completedFuture(BlockImportResult.successful(block2)));

failurePool.addFailedBlock(block, FAILED_EXECUTION_TIMEOUT);
failurePool.addFailedBlock(block2, FAILED_EXECUTION_TIMEOUT);
failurePool.addFailedBlock(block);
failurePool.addFailedBlock(block2);

asyncRunner.executeQueuedActions();
verify(blockManager).importBlock(block);
Expand Down
Loading