Skip to content

Commit

Permalink
feat: surface error to user when command topic deleted while server r…
Browse files Browse the repository at this point in the history
…unning
  • Loading branch information
stevenpyzhang committed Sep 18, 2020
1 parent af28560 commit 39235d4
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,9 @@ static KsqlRestApplication buildApplication(
metricsPrefix,
InternalTopicSerdes.deserializer(Command.class),
finalCommandTopicBackup,
errorHandler
errorHandler,
serviceContext.getTopicClient(),
commandTopicName
);

final KsqlResource ksqlResource = new KsqlResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.RetryUtil;
Expand All @@ -43,6 +44,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -81,9 +83,11 @@ public class CommandRunner implements Closeable {

private final Deserializer<Command> commandDeserializer;
private final Consumer<QueuedCommand> incompatibleCommandChecker;
private final Supplier<Boolean> backupCorrupted;
private final Errors errorHandler;
private boolean incompatibleCommandDetected;
private final Supplier<Boolean> backupCorrupted;
private final Supplier<Boolean> commandTopicExists;
private boolean commandTopicDeleted;

public enum CommandRunnerStatus {
RUNNING,
Expand All @@ -94,7 +98,8 @@ public enum CommandRunnerStatus {
public enum CommandRunnerDegradedReason {
NONE,
INCOMPATIBLE_COMMAND,
CORRUPTED
CORRUPTED,
COMMAND_TOPIC_DELETED
}

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
Expand All @@ -109,7 +114,9 @@ public CommandRunner(
final String metricsGroupPrefix,
final Deserializer<Command> commandDeserializer,
final CommandTopicBackup commandTopicBackup,
final Errors errorHandler
final Errors errorHandler,
final KafkaTopicClient kafkaTopicClient,
final String commandTopicName
) {
this(
statementExecutor,
Expand All @@ -129,7 +136,8 @@ public CommandRunner(
},
commandDeserializer,
commandTopicBackup::commandTopicCorruption,
errorHandler
errorHandler,
() -> kafkaTopicClient.isTopicExists(commandTopicName)
);
}

Expand All @@ -150,7 +158,8 @@ public CommandRunner(
final Consumer<QueuedCommand> incompatibleCommandChecker,
final Deserializer<Command> commandDeserializer,
final Supplier<Boolean> backupCorrupted,
final Errors errorHandler
final Errors errorHandler,
final Supplier<Boolean> commandTopicExists
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor");
Expand All @@ -175,7 +184,10 @@ public CommandRunner(
Objects.requireNonNull(backupCorrupted, "backupCorrupted");
this.errorHandler =
Objects.requireNonNull(errorHandler, "errorHandler");
this.commandTopicExists =
Objects.requireNonNull(commandTopicExists, "commandTopicExists");
this.incompatibleCommandDetected = false;
this.commandTopicDeleted = false;
}

/**
Expand Down Expand Up @@ -265,6 +277,9 @@ void fetchAndRunCommands() {
lastPollTime.set(clock.instant());
final List<QueuedCommand> commands = commandStore.getNewCommands(NEW_CMDS_TIMEOUT);
if (commands.isEmpty()) {
if (!commandTopicExists.get()) {
commandTopicDeleted = true;
}
return;
}

Expand Down Expand Up @@ -334,7 +349,7 @@ private void terminateCluster(final Command command) {
}

public CommandRunnerStatus checkCommandRunnerStatus() {
if (incompatibleCommandDetected || backupCorrupted.get()) {
if (incompatibleCommandDetected || backupCorrupted.get() || commandTopicDeleted) {
return CommandRunnerStatus.DEGRADED;
}

Expand All @@ -360,6 +375,10 @@ public CommandRunnerDegradedReason getCommandRunnerDegradedReason() {
return CommandRunnerDegradedReason.INCOMPATIBLE_COMMAND;
}

if (commandTopicDeleted) {
return CommandRunnerDegradedReason.COMMAND_TOPIC_DELETED;
}

return CommandRunnerDegradedReason.NONE;
}

Expand All @@ -372,6 +391,10 @@ public String getCommandRunnerDegradedWarning() {
return errorHandler.commandRunnerDegradedIncompatibleCommandsErrorMessage();
}

if (commandTopicDeleted) {
return errorHandler.commandRunnerDegradedCommandTopicDeletedErrorMessage();
}

return "";
}

Expand Down Expand Up @@ -399,7 +422,7 @@ private class Runner implements Runnable {
public void run() {
try {
while (!closed) {
if (incompatibleCommandDetected || backupCorrupted.get()) {
if (incompatibleCommandDetected || backupCorrupted.get() || commandTopicDeleted) {
LOG.warn("CommandRunner entering degraded state due to: {}",
getCommandRunnerDegradedReason());
closeEarly();
Expand All @@ -412,6 +435,9 @@ public void run() {
if (!closed) {
throw wue;
}
} catch (final OffsetOutOfRangeException e) {
LOG.warn("The command topic offset was reset. CommandRunner thread exiting.");
commandTopicDeleted = true;
} finally {
commandStore.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.confluent.ksql.rest.integration;

public class CommandTopicBackupImplFunctionalTest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Before;
Expand All @@ -72,6 +74,7 @@ public class CommandRunnerTest {
private static final long COMMAND_RUNNER_HEALTH_TIMEOUT = 1000;
private static final String BACKUP_CORRUPTED_ERROR_MESSAGE = "corrupted";
private static final String INCOMPATIBLE_COMMANDS_ERROR_MESSAGE = "incompatible";
private static final String MISSING_COMMAND_TOPIC_ERROR_MESSAGE = "command topic missing";

@Mock
private InteractiveStatementExecutor statementExecutor;
Expand Down Expand Up @@ -106,6 +109,8 @@ public class CommandRunnerTest {
@Mock
private Supplier<Boolean> backupCorrupted;
@Mock
private Supplier<Boolean> commandTopicExists;
@Mock
private Errors errorHandler;
@Captor
private ArgumentCaptor<Runnable> threadTaskCaptor;
Expand All @@ -128,9 +133,11 @@ public void setup() {
doNothing().when(incompatibleCommandChecker).accept(queuedCommand3);

when(backupCorrupted.get()).thenReturn(false);
when(commandTopicExists.get()).thenReturn(true);
when(compactor.apply(any())).thenAnswer(inv -> inv.getArgument(0));
when(errorHandler.commandRunnerDegradedIncompatibleCommandsErrorMessage()).thenReturn(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE);
when(errorHandler.commandRunnerDegradedBackupCorruptedErrorMessage()).thenReturn(BACKUP_CORRUPTED_ERROR_MESSAGE);
when(errorHandler.commandRunnerDegradedCommandTopicDeletedErrorMessage()).thenReturn(MISSING_COMMAND_TOPIC_ERROR_MESSAGE);

givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);

Expand All @@ -149,7 +156,8 @@ public void setup() {
incompatibleCommandChecker,
commandDeserializer,
backupCorrupted,
errorHandler
errorHandler,
commandTopicExists
);
}

Expand Down Expand Up @@ -305,6 +313,20 @@ public void shouldProcessPartialListOfCommandsOnIncompatibleCommandInFetch() {
verify(statementExecutor, never()).handleRestore(queuedCommand3);
}

@Test
public void shouldEnterDegradedStateIfCommandTopicMissing() {
// Given:
givenQueuedCommands();
when(commandTopicExists.get()).thenReturn(false);

// When:
commandRunner.fetchAndRunCommands();

assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(MISSING_COMMAND_TOPIC_ERROR_MESSAGE));
assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.COMMAND_TOPIC_DELETED));
}

@Test
public void shouldNotProcessCommandTopicIfBackupCorrupted() throws InterruptedException {
// Given:
Expand Down Expand Up @@ -517,6 +539,25 @@ public void shouldCloseEarlyWhenSerializationExceptionInFetch() throws Exception
inOrder.verify(commandStore).close();
}

public void shouldCloseEarlyWhenOffsetOutOfRangeException() throws Exception {
// Given:
when(commandStore.getNewCommands(any()))
.thenReturn(Collections.singletonList(queuedCommand1))
.thenThrow(new OffsetOutOfRangeException(Collections.singletonMap(new TopicPartition("command_topic", 0), 0L)));

// When:
commandRunner.start();
verify(commandStore, never()).close();
final Runnable threadTask = getThreadTask();
threadTask.run();

// Then:
final InOrder inOrder = inOrder(executor, commandStore);
inOrder.verify(commandStore).wakeup();
inOrder.verify(executor).awaitTermination(anyLong(), any());
inOrder.verify(commandStore).close();
}

@Test
public void shouldCloseTheCommandRunnerCorrectly() throws Exception {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.confluent.ksql.rest.entity.CommandId.Type;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.server.CommandTopicBackup;
import io.confluent.ksql.rest.server.CommandTopicBackupNoOp;
import io.confluent.ksql.rest.server.resources.KsqlResource;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
Expand All @@ -70,6 +69,7 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -233,7 +233,9 @@ private class KsqlServer {
"",
InternalTopicSerdes.deserializer(Command.class),
commandTopicBackup,
errorHandler
errorHandler,
topicClient,
"command_topic"
);

this.ksqlResource = new KsqlResource(
Expand Down Expand Up @@ -569,6 +571,7 @@ private void shouldRecover(final List<QueuedCommand> commands) {
@Before
public void setUp() {
topicClient.preconditionTopicExists("A");
topicClient.preconditionTopicExists("command_topic");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public class DefaultErrorMessages implements ErrorMessages {
+ System.lineSeparator()
+ "The server must be restarted after performing either operation in order to resume "
+ "normal functionality";

public static final String COMMAND_RUNNER_DEGRADED_COMMAND_TOPIC_DELETED =
"The server is in a degraded state due to deletion of the command topic. "
+ "DDL statements will not be processed."
+ System.lineSeparator()
+ "Restart the server to restore server functionality.";


@Override
Expand Down Expand Up @@ -73,4 +79,9 @@ public String commandRunnerDegradedIncompatibleCommandsErrorMessage() {
public String commandRunnerDegradedBackupCorruptedErrorMessage() {
return COMMAND_RUNNER_DEGRADED_BACKUP_CORRUPTED_ERROR_MESSAGE;
}

@Override
public String commandRunnerDegradedCommandTopicDeletedErrorMessage() {
return COMMAND_RUNNER_DEGRADED_COMMAND_TOPIC_DELETED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ public interface ErrorMessages {
String commandRunnerDegradedIncompatibleCommandsErrorMessage();

String commandRunnerDegradedBackupCorruptedErrorMessage();

String commandRunnerDegradedCommandTopicDeletedErrorMessage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ public String commandRunnerDegradedBackupCorruptedErrorMessage() {
return errorMessages.commandRunnerDegradedBackupCorruptedErrorMessage();
}

public String commandRunnerDegradedCommandTopicDeletedErrorMessage() {
return errorMessages.commandRunnerDegradedCommandTopicDeletedErrorMessage();
}

public EndpointResponse generateResponse(
final Exception e,
final EndpointResponse defaultResponse
Expand Down

0 comments on commit 39235d4

Please sign in to comment.