Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: surface error to user when command topic deleted while server running #6240

Merged
merged 12 commits into from
Sep 23, 2020
Prev Previous commit
Next Next commit
refactor
  • Loading branch information
stevenpyzhang committed Sep 21, 2020
commit deb88e15c93f03161dccbd45bc424d17fbe05f0f
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,6 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_METASTORE_BACKUP_LOCATION_DOC = "Specify the directory where "
+ "KSQL metastore backup files are located.";

public static final String KSQL_METASTORE_BACKUP_HARD_FAIL = "ksql.metastore.backup.hard.fail";
public static final Boolean KSQL_METASTORE_BACKUP_HARD_FAIL_DEFAULT = true;
public static final String KSQL_METASTORE_BACKUP_HARD_FAIL_DOC =
"Enable hard failing when the metastore backup and command topic are inconsistent. "
+ "The server will enter a degraded state if the command topic isn't present but the metastore "
+ "back up is present, or the command topic and the metastore have conflicting commands";

public static final String KSQL_SUPPRESS_ENABLED = "ksql.suppress.enabled";
public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = false;
public static final String KSQL_SUPPRESS_ENABLED_DOC =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.Lists;
import io.confluent.ksql.rest.server.computation.QueuedCommand;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -79,12 +80,18 @@ public void start() {

public Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(final Duration timeout) {
final Iterable<ConsumerRecord<byte[], byte[]>> iterable = commandConsumer.poll(timeout);
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();

if (iterable != null) {
iterable.forEach(this::backupRecord);
iterable.forEach(record -> {
backupRecord(record);
if (!commandTopicBackup.commandTopicCorruption()) {
records.add(record);
}
});
}

return iterable;
return records;
}

public List<QueuedCommand> getRestoreCommands(final Duration duration) {
Expand All @@ -100,6 +107,10 @@ public List<QueuedCommand> getRestoreCommands(final Duration duration) {
log.debug("Received {} records from poll", records.count());
for (final ConsumerRecord<byte[], byte[]> record : records) {
backupRecord(record);

if (commandTopicBackup.commandTopicCorruption()) {
continue;
}

if (record.value() == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,5 @@ public interface CommandTopicBackup {

boolean commandTopicCorruption();

boolean backupExists();

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public long read() {
private int latestReplayIdx;
private boolean corruptionDetected;

public CommandTopicBackupImpl(final String location, final String topicName) {
public CommandTopicBackupImpl(
final String location,
final String topicName) {
this(location, topicName, CURRENT_MILLIS_TICKER);
}

Expand Down Expand Up @@ -149,9 +151,11 @@ public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {
}

void writeCommandToBackup(final ConsumerRecord<CommandId, Command> record) {
// if (corruptionDetected) {
// LOG.warn("Failure to write command topic data to backup. Corruption detected in command topic.");
// }
if (corruptionDetected) {
LOG.warn("Failure to write command topic data to backup. "
+ "Corruption detected in command topic.");
return;
}

if (isRestoring()) {
if (isRecordInLatestReplay(record)) {
Expand All @@ -160,9 +164,7 @@ void writeCommandToBackup(final ConsumerRecord<CommandId, Command> record) {
} else {
LOG.info("Previous command topic backup does not match the new command topic data.");
corruptionDetected = true;
// createNewBackupFile();
// latestReplay.clear();
// LOG.info("New backup file created: {}", replayFile.getPath());
return;
}
} else if (latestReplay.size() > 0) {
// clear latest replay from memory
Expand All @@ -184,11 +186,6 @@ public boolean commandTopicCorruption() {
return corruptionDetected;
}

@Override
public boolean backupExists() {
return latestReplayFile().isPresent();
}

@VisibleForTesting
BackupReplayFile openOrCreateReplayFile() {
final Optional<BackupReplayFile> latestFile = latestReplayFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,4 @@ public void close() {
public boolean commandTopicCorruption() {
return false;
}

@Override
public boolean backupExists() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ public final class KsqlRestApplication implements Executable {
private final CompletableFuture<Void> terminatedFuture = new CompletableFuture<>();
private final QueryMonitor queryMonitor;
private final DenyListPropertyValidator denyListPropertyValidator;
private final CommandTopicBackup commandTopicBackup;

// The startup thread that can be interrupted if necessary during shutdown. This should only
// happen if startup hangs.
Expand Down Expand Up @@ -225,8 +224,7 @@ public static SourceName getCommandsStreamName() {
final Optional<LagReportingAgent> lagReportingAgent,
final Vertx vertx,
final QueryMonitor ksqlQueryMonitor,
final DenyListPropertyValidator denyListPropertyValidator,
final CommandTopicBackup commandTopicBackup
final DenyListPropertyValidator denyListPropertyValidator
) {
log.debug("Creating instance of ksqlDB API server");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
Expand Down Expand Up @@ -255,8 +253,6 @@ public static SourceName getCommandsStreamName() {
this.vertx = requireNonNull(vertx, "vertx");
this.denyListPropertyValidator =
requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
this.commandTopicBackup =
requireNonNull(commandTopicBackup, "commandTopicBackup");

this.serverInfoResource =
new ServerInfoResource(serviceContext, ksqlConfigNoPort, commandRunner);
Expand Down Expand Up @@ -670,9 +666,10 @@ static KsqlRestApplication buildApplication(

commandTopicBackup = new CommandTopicBackupImpl(
ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION),
commandTopicName)
;
commandTopicName
);
}
final CommandTopicBackup finalCommandTopicBackup = commandTopicBackup;

final CommandStore commandStore = CommandStore.Factory.create(
ksqlConfig,
Expand All @@ -682,7 +679,7 @@ static KsqlRestApplication buildApplication(
restConfig.getCommandConsumerProperties()),
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandProducerProperties()),
commandTopicBackup
finalCommandTopicBackup
);

final InteractiveStatementExecutor statementExecutor =
Expand Down Expand Up @@ -757,7 +754,9 @@ static KsqlRestApplication buildApplication(
Duration.ofMillis(restConfig.getLong(
KsqlRestConfig.KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS)),
metricsPrefix,
InternalTopicSerdes.deserializer(Command.class)
InternalTopicSerdes.deserializer(Command.class),
finalCommandTopicBackup,
errorHandler
);

final KsqlResource ksqlResource = new KsqlResource(
Expand Down Expand Up @@ -810,8 +809,7 @@ static KsqlRestApplication buildApplication(
lagReportingAgent,
vertx,
queryMonitor,
denyListPropertyValidator,
commandTopicBackup
denyListPropertyValidator
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,6 @@ void ensureConsumedPast(long seqNum, Duration timeout)
*/
boolean isEmpty();

/**
* @return whether or not data corruption is detected in the enqueued comamnds.
*/
boolean isCorrupted();

/**
* Cause any blocked {@link #getNewCommands(Duration)} calls to return early.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package io.confluent.ksql.rest.server.computation;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.server.CommandTopicBackup;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
Expand All @@ -39,6 +41,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -69,23 +73,31 @@ public class CommandRunner implements Closeable {
private final ClusterTerminator clusterTerminator;
private final ServerState serverState;

private final CommandRunnerStatusMetric commandRunnerStatusMetric;
private final CommandRunnerMetrics commandRunnerMetric;
private final AtomicReference<Pair<QueuedCommand, Instant>> currentCommandRef;
private final AtomicReference<Instant> lastPollTime;
private final Duration commandRunnerHealthTimeout;
private final Clock clock;

private final Deserializer<Command> commandDeserializer;
private final Consumer<QueuedCommand> incompatibleCommandChecker;
private final Supplier<Boolean> backupCorrupted;
private final Errors errorHandler;
private boolean incompatibleCommandDetected;

public enum CommandRunnerStatus {
RUNNING,
ERROR,
DEGRADED,
DEGRADED
}

public enum CommandRunnerDegradedReason {
NONE,
INCOMPATIBLE_COMMAND,
CORRUPTED
}

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public CommandRunner(
final InteractiveStatementExecutor statementExecutor,
final CommandQueue commandStore,
Expand All @@ -95,7 +107,9 @@ public CommandRunner(
final String ksqlServiceId,
final Duration commandRunnerHealthTimeout,
final String metricsGroupPrefix,
final Deserializer<Command> commandDeserializer
final Deserializer<Command> commandDeserializer,
final CommandTopicBackup commandTopicBackup,
final Errors errorHandler
) {
this(
statementExecutor,
Expand All @@ -113,7 +127,9 @@ public CommandRunner(
queuedCommand.getAndDeserializeCommandId();
queuedCommand.getAndDeserializeCommand(commandDeserializer);
},
commandDeserializer
commandDeserializer,
commandTopicBackup::commandTopicCorruption,
errorHandler
);
}

Expand All @@ -132,7 +148,9 @@ public CommandRunner(
final Clock clock,
final Function<List<QueuedCommand>, List<QueuedCommand>> compactor,
final Consumer<QueuedCommand> incompatibleCommandChecker,
final Deserializer<Command> commandDeserializer
final Deserializer<Command> commandDeserializer,
final Supplier<Boolean> backupCorrupted,
final Errors errorHandler
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.statementExecutor = Objects.requireNonNull(statementExecutor, "statementExecutor");
Expand All @@ -145,14 +163,18 @@ public CommandRunner(
Objects.requireNonNull(commandRunnerHealthTimeout, "commandRunnerHealthTimeout");
this.currentCommandRef = new AtomicReference<>(null);
this.lastPollTime = new AtomicReference<>(null);
this.commandRunnerStatusMetric =
new CommandRunnerStatusMetric(ksqlServiceId, this, metricsGroupPrefix);
this.commandRunnerMetric =
new CommandRunnerMetrics(ksqlServiceId, this, metricsGroupPrefix);
this.clock = Objects.requireNonNull(clock, "clock");
this.compactor = Objects.requireNonNull(compactor, "compactor");
this.incompatibleCommandChecker =
Objects.requireNonNull(incompatibleCommandChecker, "incompatibleCommandChecker");
this.commandDeserializer =
Objects.requireNonNull(commandDeserializer, "commandDeserializer");
this.backupCorrupted =
Objects.requireNonNull(backupCorrupted, "backupCorrupted");
this.errorHandler =
Objects.requireNonNull(errorHandler, "errorHandler");
this.incompatibleCommandDetected = false;
}

Expand All @@ -173,7 +195,7 @@ public void close() {
if (!closed) {
closeEarly();
}
commandRunnerStatusMetric.close();
commandRunnerMetric.close();
}

/**
Expand Down Expand Up @@ -312,11 +334,7 @@ private void terminateCluster(final Command command) {
}

public CommandRunnerStatus checkCommandRunnerStatus() {
if (commandStore.isCorrupted()) {
return CommandRunnerStatus.CORRUPTED;
}

if (incompatibleCommandDetected) {
if (incompatibleCommandDetected || backupCorrupted.get()) {
return CommandRunnerStatus.DEGRADED;
}

Expand All @@ -333,6 +351,30 @@ public CommandRunnerStatus checkCommandRunnerStatus() {
? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR;
}

public CommandRunnerDegradedReason getCommandRunnerDegradedReason() {
if (backupCorrupted.get()) {
return CommandRunnerDegradedReason.CORRUPTED;
}

if (incompatibleCommandDetected) {
return CommandRunnerDegradedReason.INCOMPATIBLE_COMMAND;
}

return CommandRunnerDegradedReason.NONE;
}

public String getCommandRunnerDegradedWarning() {
if (backupCorrupted.get()) {
return errorHandler.commandRunnerDegradedBackupCorruptedErrorMessage();
}

if (incompatibleCommandDetected) {
return errorHandler.commandRunnerDegradedIncompatibleCommandsErrorMessage();
}

return "";
}

private List<QueuedCommand> checkForIncompatibleCommands(final List<QueuedCommand> commands) {
final List<QueuedCommand> compatibleCommands = new ArrayList<>();
try {
Expand All @@ -357,8 +399,9 @@ private class Runner implements Runnable {
public void run() {
try {
while (!closed) {
if (incompatibleCommandDetected) {
LOG.warn("CommandRunner entering degraded state after failing to deserialize command");
if (incompatibleCommandDetected || backupCorrupted.get()) {
LOG.warn("CommandRunner entering degraded state due to: {}",
getCommandRunnerDegradedReason());
closeEarly();
} else {
LOG.trace("Polling for new writes to command topic");
Expand Down
Loading