Skip to content

Commit

Permalink
fix: don't cleanup topics on engine close (#4658)
Browse files Browse the repository at this point in the history
Co-authored-by: Rohan <desai.p.rohan@gmail.com>
Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
3 people authored and colinhicks committed Feb 27, 2020
1 parent d96db14 commit ad66a81
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public QueryMetadata executeQuery(

@Override
public void close() {
allLiveQueries.forEach(QueryMetadata::close);
allLiveQueries.forEach(QueryMetadata::stop);
engineMetrics.close();
aggregateMetricsCollector.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,12 @@ public QueryMetadata buildTransientQuery(
overrides,
queryCloseCallback,
ksqlConfig.getLong(KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG)
);
) {
@Override
public void stop() {
close();
}
};
}

private static Optional<MaterializationInfo> getMaterializationInfo(final Object result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,9 @@ public Optional<Materialization> getMaterialization(
) {
return materializationProvider.map(builder -> builder.build(queryId, contextStacker));
}

@Override
public void stop() {
doClose(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryMetadata {
public abstract class QueryMetadata {

private static final Logger LOG = LoggerFactory.getLogger(QueryMetadata.class);

Expand Down Expand Up @@ -169,14 +169,39 @@ public boolean hasEverBeenStarted() {
return everStarted;
}


/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
*
* <p>NOTE: {@link TransientQueryMetadata} overrides this method
* since any time a transient query is stopped the external resources
* should be cleaned up.</p>
*
* @see #close()
*/
public abstract void stop();

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
* schemas, etc...).
*
* @see QueryMetadata#stop()
*/
public void close() {
doClose(true);
closeCallback.accept(this);
}

protected void doClose(final boolean cleanUp) {
kafkaStreams.close(Duration.ofMillis(closeTimeout));

kafkaStreams.cleanUp();
if (cleanUp) {
kafkaStreams.cleanUp();
}

queryStateListener.ifPresent(QueryStateListener::close);

closeCallback.accept(this);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,18 @@ public void setLimitHandler(final LimitHandler limitHandler) {
}

@Override
public void close() {
public void stop() {
close();
}

@Override
protected void doClose(final boolean cleanUp) {
// To avoid deadlock, close the queue first to ensure producer side isn't blocked trying to
// write to the blocking queue, otherwise super.close call can deadlock:
rowQueue.close();

// Now safe to close:
super.close();
super.doClose(cleanUp);
isRunning.set(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -675,6 +676,63 @@ public void shouldCleanUpInternalTopicsOnClose() {
verify(topicClient).deleteInternalTopics(query.getQueryApplicationId());
}

@Test
public void shouldCleanUpInternalTopicsOnEngineCloseForTransientQueries() {
// Given:
final QueryMetadata query = KsqlEngineTestUtil.executeQuery(
serviceContext,
ksqlEngine,
"select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.start();

// When:
ksqlEngine.close();

// Then:
verify(topicClient).deleteInternalTopics(query.getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnEngineCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream persistent as select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
ksqlEngine.close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient, never()).deleteInternalTopics(any());
}

@Test
public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream persistent as select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
query.get(0).close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnCloseIfQueryNeverStarted() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -60,9 +62,11 @@ public class QueryMetadataTest {
@Mock
private Consumer<QueryMetadata> closeCallback;
private QueryMetadata query;
private boolean cleanUp;

@Before
public void setup() {
cleanUp = false;
query = new QueryMetadata(
"foo",
kafkaStreams,
Expand All @@ -74,7 +78,12 @@ public void setup() {
Collections.emptyMap(),
Collections.emptyMap(),
closeCallback,
closeTimeout);
closeTimeout) {
@Override
public void stop() {
doClose(cleanUp);
}
};
}

@Test
Expand Down Expand Up @@ -136,6 +145,24 @@ public void shouldCloseKStreamsAppOnCloseThenCloseCallback() {
inOrder.verify(closeCallback).accept(query);
}

@Test
public void shouldNotCallCloseCallbackOnStop() {
// When:
query.stop();

// Then:
verifyNoMoreInteractions(closeCallback);
}

@Test
public void shouldCallKafkaStreamsCloseOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams).close(Duration.ofMillis(closeTimeout));
}

@Test
public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
// When:
Expand All @@ -147,6 +174,27 @@ public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
inOrder.verify(kafkaStreams).cleanUp();
}

@Test
public void shouldNotCleanUpKStreamsAppOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams, never()).cleanUp();
}

@Test
public void shouldCallCleanupOnStopIfCleanup() {
// Given:
cleanUp = true;

// When:
query.stop();

// Then:
verify(kafkaStreams).cleanUp();
}

@Test
public void shouldReturnSources() {
assertThat(query.getSourceNames(), is(SOME_SOURCES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;

import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.BlockingRowQueue;
Expand Down Expand Up @@ -87,4 +88,17 @@ public void shouldCloseQueueBeforeTopologyToAvoidDeadLock() {
inOrder.verify(rowQueue).close();
inOrder.verify(kafkaStreams).close(any());
}

@Test
public void shouldCallCloseOnStop() {
// When:
query.stop();

// Then:
final InOrder inOrder = inOrder(rowQueue, kafkaStreams, closeCallback);
inOrder.verify(rowQueue).close();
inOrder.verify(kafkaStreams).close(any());
inOrder.verify(kafkaStreams).cleanUp();
inOrder.verify(closeCallback).accept(query);
}
}

0 comments on commit ad66a81

Please sign in to comment.