Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't use queryId of last terminate command after restore #6278

Merged
merged 8 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
rebase
  • Loading branch information
vpapavas committed Sep 23, 2020
commit c7aaca98df597b12f7f44612041e60cfa5663d02
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
Expand Down Expand Up @@ -266,15 +267,15 @@ public void processPriorCommands() {
}

final List<QueuedCommand> compacted = compactor.apply(compatibleCommands);

final QueryId lastTerminateQueryId = RestoreCommandsCompactor.lastTerminateQueryId;
compacted.forEach(
command -> {
currentCommandRef.set(new Pair<>(command, clock.instant()));
RetryUtil.retryWithBackoff(
maxRetries,
STATEMENT_RETRY_MS,
MAX_STATEMENT_RETRY_MS,
() -> statementExecutor.handleRestore(command),
() -> statementExecutor.handleRestore(command, lastTerminateQueryId),
WakeupException.class
);
currentCommandRef.set(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,23 @@ void handleStatement(final QueuedCommand queuedCommand) {
queuedCommand.getAndDeserializeCommandId(),
queuedCommand.getStatus(),
Mode.EXECUTE,
queuedCommand.getOffset()
queuedCommand.getOffset(),
Optional.empty()
);
}

void handleRestore(final QueuedCommand queuedCommand) {
void handleRestore(final QueuedCommand queuedCommand, final QueryId lastTerminateQueryId) {
throwIfNotConfigured();

final Optional<QueryId> queryId = lastTerminateQueryId == null
? Optional.empty() : Optional.of(lastTerminateQueryId);
handleStatementWithTerminatedQueries(
queuedCommand.getAndDeserializeCommand(commandDeserializer),
queuedCommand.getAndDeserializeCommandId(),
queuedCommand.getStatus(),
Mode.RESTORE,
queuedCommand.getOffset()
queuedCommand.getOffset(),
queryId
);
}

Expand Down Expand Up @@ -191,11 +195,19 @@ private void handleStatementWithTerminatedQueries(
final CommandId commandId,
final Optional<CommandStatusFuture> commandStatusFuture,
final Mode mode,
final long offset
final long offset,
final Optional<QueryId> lastTerminateQueryId
) {
try {
if (command.getPlan().isPresent()) {
executePlan(command, commandId, commandStatusFuture, command.getPlan().get(), mode, offset);
executePlan(
command,
commandId,
commandStatusFuture,
command.getPlan().get(),
mode,
offset,
lastTerminateQueryId);
return;
}
final String statementString = command.getStatement();
Expand Down Expand Up @@ -228,7 +240,8 @@ private void executePlan(
final Optional<CommandStatusFuture> commandStatusFuture,
final KsqlPlan plan,
final Mode mode,
final long offset
final long offset,
final Optional<QueryId> lastTerminateQueryId
) {
final KsqlConfig mergedConfig = buildMergedConfig(command);
final ConfiguredKsqlPlan configured = ConfiguredKsqlPlan.of(
Expand All @@ -242,7 +255,19 @@ private void executePlan(
);
final ExecuteResult result = ksqlEngine.execute(serviceContext, configured);
if (result.getQuery().isPresent()) {
queryIdGenerator.setNextId(offset + 1);
long queryID = Long.MIN_VALUE;
if (lastTerminateQueryId.isPresent()) {
final String ltq = lastTerminateQueryId.get().toString();
final int lastIndex = ltq.lastIndexOf("_");
queryID = Long.parseLong(ltq.substring(lastIndex + 1));
}
// We increase the queryID by 1 if the last command was a terminate,
// to avoid the new command getting the same queryId.
if (offset > 0 && offset == queryID - 1) {
queryIdGenerator.setNextId(queryID + 1);
} else {
queryIdGenerator.setNextId(offset + 1);
}
if (mode == Mode.EXECUTE) {
result.getQuery().get().start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Util for compacting the restore commands
*/
public final class RestoreCommandsCompactor {

static QueryId lastTerminateQueryId;
private static final Logger LOG = LoggerFactory.getLogger(RestoreCommandsCompactor.class);

private RestoreCommandsCompactor() {
}

Expand Down Expand Up @@ -86,6 +91,8 @@ public static CompactedNode maybeAppend(
if (queued.getAndDeserializeCommandId().getType() == Type.TERMINATE) {
final QueryId queryId = new QueryId(queued.getAndDeserializeCommandId().getEntity());
markShouldSkip(queryId, latestNodeWithId);
//keep track of the last terminate command
lastTerminateQueryId = queryId;

// terminate commands don't get added to the list of commands to execute
// because we "execute" them in this class by removing query plans from
Expand Down Expand Up @@ -151,4 +158,9 @@ private static Optional<QueuedCommand> compact(final CompactedNode node) {
node.queued.getOffset()
));
}

static QueryId getLastTerminateQueryId() {
return lastTerminateQueryId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
Expand Down Expand Up @@ -166,9 +165,9 @@ public void shouldRunThePriorCommandsCorrectly() {

// Then:
final InOrder inOrder = inOrder(statementExecutor);
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any());
vpapavas marked this conversation as resolved.
Show resolved Hide resolved
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2), any());
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3), any());
}

@Test
Expand All @@ -186,7 +185,7 @@ public void shouldRunThePriorCommandsWithTerminateCorrectly() {
inOrder.verify(commandStore).wakeup();
inOrder.verify(clusterTerminator).terminateCluster(anyList());

verify(statementExecutor, never()).handleRestore(any());
verify(statementExecutor, never()).handleRestore(any(), any());
}

@Test
Expand All @@ -199,7 +198,7 @@ public void shouldEarlyOutIfRestoreContainsTerminate() {
commandRunner.processPriorCommands();

// Then:
verify(statementExecutor, never()).handleRestore(any());
verify(statementExecutor, never()).handleRestore(any(), any());
}

@Test
Expand All @@ -224,10 +223,10 @@ public void shouldOnlyRestoreCompacted() {

// Then:
final InOrder inOrder = inOrder(statementExecutor);
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any());
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand3), any());

verify(statementExecutor, never()).handleRestore(queuedCommand2);
verify(statementExecutor, never()).handleRestore(eq(queuedCommand2), any());
}

@Test
Expand All @@ -244,13 +243,14 @@ public void shouldProcessPartialListOfCommandsOnDeserializationExceptionInRestor

// Then:
final InOrder inOrder = inOrder(statementExecutor);
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any());
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2), any());

assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE));
assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.INCOMPATIBLE_COMMAND));
verify(statementExecutor, never()).handleRestore(queuedCommand3);

verify(statementExecutor, never()).handleRestore(eq(queuedCommand3), any());
}

@Test
Expand Down Expand Up @@ -287,13 +287,13 @@ public void shouldProcessPartialListOfCommandsOnIncompatibleCommandInRestore() {

// Then:
final InOrder inOrder = inOrder(statementExecutor);
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1), any());
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2), any());

assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
assertThat(commandRunner.getCommandRunnerDegradedWarning(), is(INCOMPATIBLE_COMMANDS_ERROR_MESSAGE));
assertThat(commandRunner.getCommandRunnerDegradedReason(), is(CommandRunner.CommandRunnerDegradedReason.INCOMPATIBLE_COMMAND));
verify(statementExecutor, never()).handleRestore(queuedCommand3);
verify(statementExecutor, never()).handleRestore(eq(queuedCommand3), any());
}

@Test
Expand Down Expand Up @@ -414,9 +414,9 @@ public void shouldEarlyOutIfNewCommandsContainsTerminate() {
commandRunner.fetchAndRunCommands();

// Then:
verify(statementExecutor, never()).handleRestore(queuedCommand1);
verify(statementExecutor, never()).handleRestore(queuedCommand2);
verify(statementExecutor, never()).handleRestore(queuedCommand3);
verify(statementExecutor, never()).handleRestore(eq(queuedCommand1), any());
verify(statementExecutor, never()).handleRestore(eq(queuedCommand2), any());
verify(statementExecutor, never()).handleRestore(eq(queuedCommand3), any());
}

@Test
Expand Down Expand Up @@ -483,7 +483,7 @@ public void shouldEarlyOutOnShutdown() {
commandRunner.fetchAndRunCommands();

// Then:
verify(statementExecutor, never()).handleRestore(queuedCommand2);
verify(statementExecutor, never()).handleRestore(eq(queuedCommand2), any());
}

@Test
Expand Down Expand Up @@ -525,7 +525,7 @@ public void shouldNotStartCommandRunnerThreadIfSerializationExceptionInRestore()
inOrder.verify(executor).awaitTermination(anyLong(), any());
inOrder.verify(commandStore).close();
verify(commandStore, never()).getNewCommands(any());
verify(statementExecutor, times(2)).handleRestore(any());
verify(statementExecutor, times(2)).handleRestore(any(), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void shouldThrowOnHandleRestoreIfNotConfigured() {
);

// When:
statementExecutor.handleRestore(queuedCommand);
statementExecutor.handleRestore(queuedCommand, null);
}

@Test
Expand Down Expand Up @@ -548,7 +548,9 @@ public void shouldHandlePriorStatements() {
for (int i = 0; i < priorCommands.size(); i++) {
final Pair<CommandId, Command> pair = priorCommands.get(i);
statementExecutor.handleRestore(
new QueuedCommand(pair.left, pair.right, Optional.empty(), (long) i)
new QueuedCommand(pair.left, pair.right, Optional.empty(), (long) i),
null

);
}

Expand Down Expand Up @@ -632,7 +634,8 @@ public void shouldSetNextQueryIdToNextOffsetWhenExecutingRestoreCommand() {
command,
Optional.empty(),
2L
)
),
null
);

// Then:
Expand All @@ -655,7 +658,8 @@ public void shouldSkipStartWhenReplayingLog() {
command,
Optional.empty(),
0L
)
),
null
);

// Then:
Expand Down