Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add logging during restore #4270

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix: add logging during restore
Fixes: #4269
  • Loading branch information
big-andy-coates committed Jan 10, 2020
commit 5a144d70c60b7da8cb290b7e1657fe598ae4da18
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.rest.server.computation;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
Expand All @@ -42,12 +41,13 @@

/**
* Handles the logic of reading distributed commands, including pre-existing commands that were
* issued before being initialized, and then delegating their execution to a
* {@link InteractiveStatementExecutor}.
* Also responsible for taking care of any exceptions that occur in the process.
* issued before being initialized, and then delegating their execution to a {@link
* InteractiveStatementExecutor}. Also responsible for taking care of any exceptions that occur in
* the process.
*/
public class CommandRunner implements Closeable {
private static final Logger log = LoggerFactory.getLogger(CommandRunner.class);

private static final Logger LOG = LoggerFactory.getLogger(CommandRunner.class);

private static final int STATEMENT_RETRY_MS = 100;
private static final int MAX_STATEMENT_RETRY_MS = 5 * 1000;
Expand Down Expand Up @@ -152,27 +152,46 @@ public void close() {
* Read and execute all commands on the command topic, starting at the earliest offset.
*/
public void processPriorCommands() {
final List<QueuedCommand> restoreCommands = commandStore.getRestoreCommands();
final Optional<QueuedCommand> terminateCmd = findTerminateCommand(restoreCommands);
if (terminateCmd.isPresent()) {
terminateCluster(terminateCmd.get().getCommand());
return;
try {
final List<QueuedCommand> restoreCommands = commandStore.getRestoreCommands();

LOG.info("Restoring previous state from {} commands.", restoreCommands.size());

final Optional<QueuedCommand> terminateCmd = findTerminateCommand(restoreCommands);
if (terminateCmd.isPresent()) {
LOG.info("Cluster previously terminated: terminating.");
terminateCluster(terminateCmd.get().getCommand());
return;
}

restoreCommands.forEach(
command -> {
currentCommandRef.set(new Pair<>(command, clock.instant()));
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
() -> statementExecutor.handleRestore(command),
WakeupException.class
);
currentCommandRef.set(null);
}
);

final List<PersistentQueryMetadata> queries = statementExecutor
.getKsqlEngine()
.getPersistentQueries();

LOG.info("Restarting {} queries.", queries.size());

queries.forEach(PersistentQueryMetadata::start);

LOG.info("Restore complete");

} catch (final Exception e) {
LOG.error("Error during restore", e);
throw e;
}
restoreCommands.forEach(
command -> {
currentCommandRef.set(new Pair<>(command, clock.instant()));
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
() -> statementExecutor.handleRestore(command),
WakeupException.class
);
currentCommandRef.set(null);
}
);
final KsqlEngine ksqlEngine = statementExecutor.getKsqlEngine();
ksqlEngine.getPersistentQueries().forEach(PersistentQueryMetadata::start);
}

void fetchAndRunCommands() {
Expand All @@ -187,7 +206,7 @@ void fetchAndRunCommands() {
return;
}

log.trace("Found {} new writes to command topic", commands.size());
LOG.debug("Found {} new writes to command topic", commands.size());
for (final QueuedCommand command : commands) {
if (closed) {
return;
Expand All @@ -198,14 +217,14 @@ void fetchAndRunCommands() {
}

private void executeStatement(final QueuedCommand queuedCommand) {
log.info("Executing statement: " + queuedCommand.getCommand().getStatement());
LOG.info("Executing statement: " + queuedCommand.getCommand().getStatement());

final Runnable task = () -> {
if (closed) {
log.info("Execution aborted as system is closing down");
LOG.info("Execution aborted as system is closing down");
} else {
statementExecutor.handleStatement(queuedCommand);
log.info("Executed statement: " + queuedCommand.getCommand().getStatement());
LOG.info("Executed statement: " + queuedCommand.getCommand().getStatement());
}
};

Expand All @@ -232,13 +251,13 @@ private static Optional<QueuedCommand> findTerminateCommand(
@SuppressWarnings("unchecked")
private void terminateCluster(final Command command) {
serverState.setTerminating();
log.info("Terminating the KSQL server.");
LOG.info("Terminating the KSQL server.");
this.close();
final List<String> deleteTopicList = (List<String>) command.getOverwriteProperties()
.getOrDefault(ClusterTerminateRequest.DELETE_TOPIC_LIST_PROP, Collections.emptyList());

clusterTerminator.terminateCluster(deleteTopicList);
log.info("The KSQL server was terminated.");
LOG.info("The KSQL server was terminated.");
}

CommandRunnerStatus checkCommandRunnerStatus() {
Expand All @@ -252,17 +271,13 @@ CommandRunnerStatus checkCommandRunnerStatus() {
? CommandRunnerStatus.RUNNING : CommandRunnerStatus.ERROR;
}

Pair<QueuedCommand, Instant> getCurrentCommand() {
return currentCommandRef.get();
}

private class Runner implements Runnable {

@Override
public void run() {
try {
while (!closed) {
log.debug("Polling for new writes to command topic");
LOG.trace("Polling for new writes to command topic");
fetchAndRunCommands();
}
} catch (final WakeupException wue) {
Expand Down