Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: surface error to user when command topic deleted while server running #6240

Merged
merged 12 commits into from
Sep 23, 2020
Next Next commit
test commit
  • Loading branch information
stevenpyzhang committed Sep 21, 2020
commit 2e06bfc99fa013c42da60acbf6e6c0d1c7d759b4
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,13 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_METASTORE_BACKUP_LOCATION_DOC = "Specify the directory where "
+ "KSQL metastore backup files are located.";

public static final String KSQL_METASTORE_BACKUP_HARD_FAIL = "ksql.metastore.backup.hard.fail";
public static final Boolean KSQL_METASTORE_BACKUP_HARD_FAIL_DEFAULT = true;
public static final String KSQL_METASTORE_BACKUP_HARD_FAIL_DOC =
"Enable hard failing when the metastore backup and command topic are inconsistent. "
+ "The server will enter a degraded state if the command topic isn't present but the metastore "
+ "back up is present, or the command topic and the metastore have conflicting commands";

public static final String KSQL_SUPPRESS_ENABLED = "ksql.suppress.enabled";
public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = false;
public static final String KSQL_SUPPRESS_ENABLED_DOC =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ public interface CommandTopicBackup {

void writeRecord(ConsumerRecord<byte[], byte[]> record);

boolean commandTopicCorruption();

boolean backupExists();

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public long read() {
private BackupReplayFile replayFile;
private List<Pair<CommandId, Command>> latestReplay;
private int latestReplayIdx;
private boolean corruptionDetected;

public CommandTopicBackupImpl(final String location, final String topicName) {
this(location, topicName, CURRENT_MILLIS_TICKER);
Expand Down Expand Up @@ -92,6 +93,7 @@ public void initialize() {
}

latestReplayIdx = 0;
corruptionDetected = false;
LOG.info("Command topic will be backup on file: {}", replayFile.getPath());
}

Expand Down Expand Up @@ -147,16 +149,20 @@ public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {
}

void writeCommandToBackup(final ConsumerRecord<CommandId, Command> record) {
// if (corruptionDetected) {
// LOG.warn("Failure to write command topic data to backup. Corruption detected in command topic.");
// }

if (isRestoring()) {
if (isRecordInLatestReplay(record)) {
// Ignore backup because record was already replayed
return;
} else {
LOG.info("Previous command topic backup does not match the new command topic data. "
+ "A new backup file will be created.");
createNewBackupFile();
latestReplay.clear();
LOG.info("New backup file created: {}", replayFile.getPath());
LOG.info("Previous command topic backup does not match the new command topic data.");
corruptionDetected = true;
// createNewBackupFile();
// latestReplay.clear();
// LOG.info("New backup file created: {}", replayFile.getPath());
}
} else if (latestReplay.size() > 0) {
// clear latest replay from memory
Expand All @@ -173,26 +179,14 @@ void writeCommandToBackup(final ConsumerRecord<CommandId, Command> record) {
}
}

private void createNewBackupFile() {
try {
replayFile.close();
} catch (IOException e) {
LOG.warn("Couldn't close the current backup file {}. Error = {}",
replayFile.getPath(), e.getMessage());
}

replayFile = newReplayFile();
@Override
public boolean commandTopicCorruption() {
return corruptionDetected;
}

if (latestReplay.size() > 0 && latestReplayIdx > 0) {
try {
replayFile.write(latestReplay.subList(0, latestReplayIdx));
} catch (final IOException e) {
LOG.warn("Couldn't write the latest replayed commands to the new backup file {}. "
+ "Make sure the file exists and has permissions to write. "
+ "KSQL must be restarted afterwards to complete the backup process. Error = {}",
replayFile.getPath(), e.getMessage());
}
}
@Override
public boolean backupExists() {
return latestReplayFile().isPresent();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,14 @@ public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {
public void close() {
// no-op
}

@Override
public boolean commandTopicCorruption() {
return false;
}

@Override
public boolean backupExists() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public final class KsqlRestApplication implements Executable {
private final CompletableFuture<Void> terminatedFuture = new CompletableFuture<>();
private final QueryMonitor queryMonitor;
private final DenyListPropertyValidator denyListPropertyValidator;
private final CommandTopicBackup commandTopicBackup;

// The startup thread that can be interrupted if necessary during shutdown. This should only
// happen if startup hangs.
Expand Down Expand Up @@ -224,7 +225,8 @@ public static SourceName getCommandsStreamName() {
final Optional<LagReportingAgent> lagReportingAgent,
final Vertx vertx,
final QueryMonitor ksqlQueryMonitor,
final DenyListPropertyValidator denyListPropertyValidator
final DenyListPropertyValidator denyListPropertyValidator,
final CommandTopicBackup commandTopicBackup
) {
log.debug("Creating instance of ksqlDB API server");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
Expand Down Expand Up @@ -253,6 +255,8 @@ public static SourceName getCommandsStreamName() {
this.vertx = requireNonNull(vertx, "vertx");
this.denyListPropertyValidator =
requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
this.commandTopicBackup =
requireNonNull(commandTopicBackup, "commandTopicBackup");

this.serverInfoResource =
new ServerInfoResource(serviceContext, ksqlConfigNoPort, commandRunner);
Expand Down Expand Up @@ -656,14 +660,29 @@ static KsqlRestApplication buildApplication(

final String commandTopicName = ReservedInternalTopics.commandTopic(ksqlConfig);

CommandTopicBackup commandTopicBackup = new CommandTopicBackupNoOp();
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ENABLE_METASTORE_BACKUP)) {
if (ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION).isEmpty()) {
throw new KsqlException(String.format("Metastore backups is enabled, but location "
+ "is empty. Please specify the location with the property '%s'",
KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION));
}

commandTopicBackup = new CommandTopicBackupImpl(
ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION),
commandTopicName)
;
}

final CommandStore commandStore = CommandStore.Factory.create(
ksqlConfig,
commandTopicName,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandConsumerProperties()),
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandProducerProperties())
restConfig.getCommandProducerProperties()),
commandTopicBackup
);

final InteractiveStatementExecutor statementExecutor =
Expand Down Expand Up @@ -791,7 +810,8 @@ static KsqlRestApplication buildApplication(
lagReportingAgent,
vertx,
queryMonitor,
denyListPropertyValidator
denyListPropertyValidator,
commandTopicBackup
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ void ensureConsumedPast(long seqNum, Duration timeout)
*/
boolean isEmpty();

/**
* @return whether or not data corruption is detected in the enqueued comamnds.
*/
boolean isCorrupted();

/**
* Cause any blocked {@link #getNewCommands(Duration)} calls to return early.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public class CommandRunner implements Closeable {
public enum CommandRunnerStatus {
RUNNING,
ERROR,
DEGRADED
DEGRADED,
CORRUPTED
}

public CommandRunner(
Expand Down Expand Up @@ -311,6 +312,10 @@ private void terminateCluster(final Command command) {
}

public CommandRunnerStatus checkCommandRunnerStatus() {
if (commandStore.isCorrupted()) {
return CommandRunnerStatus.CORRUPTED;
}

if (incompatibleCommandDetected) {
return CommandRunnerStatus.DEGRADED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class CommandStore implements CommandQueue, Closeable {
private final Serializer<CommandId> commandIdSerializer;
private final Serializer<Command> commandSerializer;
private final Deserializer<CommandId> commandIdDeserializer;
private final CommandTopicBackup commandTopicBackup;


public static final class Factory {
Expand All @@ -88,7 +89,8 @@ public static CommandStore create(
final String commandTopicName,
final Duration commandQueueCatchupTimeout,
final Map<String, Object> kafkaConsumerProperties,
final Map<String, Object> kafkaProducerProperties
final Map<String, Object> kafkaProducerProperties,
final CommandTopicBackup commandTopicBackup
) {
kafkaConsumerProperties.put(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
Expand All @@ -107,20 +109,6 @@ public static CommandStore create(
"all"
);

CommandTopicBackup commandTopicBackup = new CommandTopicBackupNoOp();
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ENABLE_METASTORE_BACKUP)) {
if (ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION).isEmpty()) {
throw new KsqlException(String.format("Metastore backups is enabled, but location "
+ "is empty. Please specify the location with the property '%s'",
KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION));
}

commandTopicBackup = new CommandTopicBackupImpl(
ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION),
commandTopicName)
;
}

return new CommandStore(
commandTopicName,
new CommandTopic(
Expand All @@ -134,7 +122,8 @@ public static CommandStore create(
commandQueueCatchupTimeout,
InternalTopicSerdes.serializer(),
InternalTopicSerdes.serializer(),
InternalTopicSerdes.deserializer(CommandId.class)
InternalTopicSerdes.deserializer(CommandId.class),
commandTopicBackup
);
}
}
Expand All @@ -148,7 +137,8 @@ public static CommandStore create(
final Duration commandQueueCatchupTimeout,
final Serializer<CommandId> commandIdSerializer,
final Serializer<Command> commandSerializer,
final Deserializer<CommandId> commandIdDeserializer
final Deserializer<CommandId> commandIdDeserializer,
final CommandTopicBackup commandTopicBackup
) {
this.commandTopic = Objects.requireNonNull(commandTopic, "commandTopic");
this.commandStatusMap = Maps.newConcurrentMap();
Expand All @@ -167,6 +157,7 @@ public static CommandStore create(
Objects.requireNonNull(commandSerializer, "commandSerializer");
this.commandIdDeserializer =
Objects.requireNonNull(commandIdDeserializer, "commandIdDeserializer");
this.commandTopicBackup = Objects.requireNonNull(commandTopicBackup, "commandTopicBackup");
}

@Override
Expand Down Expand Up @@ -338,6 +329,11 @@ private long getCommandTopicOffset() {
}
}

@Override
public boolean isCorrupted() {
return commandTopicBackup.commandTopicCorruption();
}

@Override
public boolean isEmpty() {
return commandTopic.getEndOffset() == 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class DistributingExecutor {
private final CommandIdAssigner commandIdAssigner;
private final ReservedInternalTopics internalTopics;
private final Errors errorHandler;
private final Supplier<Boolean> commandRunnerDegraded;
private final Supplier<String> commandRunnerWarning;

public DistributingExecutor(
final KsqlConfig ksqlConfig,
Expand All @@ -70,7 +70,7 @@ public DistributingExecutor(
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final ValidatedCommandFactory validatedCommandFactory,
final Errors errorHandler,
final Supplier<Boolean> commandRunnerDegraded
final Supplier<String> commandRunnerWarning
) {
this.commandQueue = commandQueue;
this.distributedCmdResponseTimeout =
Expand All @@ -86,8 +86,8 @@ public DistributingExecutor(
this.internalTopics =
new ReservedInternalTopics(Objects.requireNonNull(ksqlConfig, "ksqlConfig"));
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");
this.commandRunnerDegraded =
Objects.requireNonNull(commandRunnerDegraded, "commandRunnerDegraded");
this.commandRunnerWarning =
Objects.requireNonNull(commandRunnerWarning, "commandRunnerWarning");
}

/**
Expand All @@ -105,10 +105,11 @@ public Optional<KsqlEntity> execute(
final KsqlExecutionContext executionContext,
final KsqlSecurityContext securityContext
) {
if (commandRunnerDegraded.get()) {
final String commandRunnerWarningString = commandRunnerWarning.get();
if (!commandRunnerWarningString.equals("")) {
throw new KsqlServerException("Failed to handle Ksql Statement."
+ System.lineSeparator()
+ errorHandler.commandRunnerDegradedErrorMessage());
+ commandRunnerWarningString);
}
final ConfiguredStatement<?> injected = injectorFactory
.apply(executionContext, securityContext.getServiceContext())
Expand Down
Loading