diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgent.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgent.java index ecb0fbd80fb5..884efd35b7f9 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgent.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgent.java @@ -25,6 +25,7 @@ import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.ServerUtil; +import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.SimpleKsqlClient; import io.confluent.ksql.util.KsqlConfig; @@ -47,27 +48,32 @@ public class HealthCheckAgent { public static final String METASTORE_CHECK_NAME = "metastore"; public static final String KAFKA_CHECK_NAME = "kafka"; + public static final String COMMAND_RUNNER_CHECK_NAME = "commandRunner"; private static final List DEFAULT_CHECKS = ImmutableList.of( new ExecuteStatementCheck(METASTORE_CHECK_NAME, "list streams; list tables; list queries;"), - new KafkaBrokerCheck(KAFKA_CHECK_NAME) + new KafkaBrokerCheck(KAFKA_CHECK_NAME), + new CommandRunnerCheck(COMMAND_RUNNER_CHECK_NAME) ); private final SimpleKsqlClient ksqlClient; private final URI serverEndpoint; private final ServiceContext serviceContext; private final KsqlConfig ksqlConfig; + private final CommandRunner commandRunner; public HealthCheckAgent( final SimpleKsqlClient ksqlClient, final KsqlRestConfig restConfig, final ServiceContext serviceContext, - final KsqlConfig ksqlConfig + final KsqlConfig ksqlConfig, + final CommandRunner commandRunner ) { this.ksqlClient = Objects.requireNonNull(ksqlClient, "ksqlClient"); this.serverEndpoint = ServerUtil.getServerAddress(restConfig); this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); + this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner"); } public HealthCheckResponse checkHealth() { @@ -155,4 +161,24 @@ public HealthCheckResponseDetail check(final HealthCheckAgent healthCheckAgent) return new HealthCheckResponseDetail(isHealthy); } } + + private static class CommandRunnerCheck implements Check { + private final String name; + + CommandRunnerCheck(final String name) { + this.name = Objects.requireNonNull(name, "name"); + } + + @Override + public String getName() { + return name; + } + + @Override + public HealthCheckResponseDetail check(final HealthCheckAgent healthCheckAgent) { + return new HealthCheckResponseDetail( + healthCheckAgent.commandRunner.checkCommandRunnerStatus() + == CommandRunner.CommandRunnerStatus.RUNNING); + } + } } \ No newline at end of file diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 32f967ed777c..335065b17ba3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -277,7 +277,8 @@ public static SourceName getCommandsStreamName() { ksqlResource, serviceContext, this.restConfig, - this.ksqlConfigNoPort); + this.ksqlConfigNoPort, + this.commandRunner); this.queryMonitor = requireNonNull(ksqlQueryMonitor, "ksqlQueryMonitor"); MetricCollectors.addConfigurableReporter(ksqlConfigNoPort); this.pullQueryMetrics = requireNonNull(pullQueryMetrics, "pullQueryMetrics"); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/HealthCheckResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/HealthCheckResource.java index fbbfad578a27..ff7b211944d5 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/HealthCheckResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/HealthCheckResource.java @@ -23,6 +23,7 @@ import io.confluent.ksql.rest.entity.HealthCheckResponse; import io.confluent.ksql.rest.healthcheck.HealthCheckAgent; import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.services.ServerInternalKsqlClient; import io.confluent.ksql.security.KsqlSecurityContext; import io.confluent.ksql.services.ServiceContext; @@ -62,13 +63,17 @@ public static HealthCheckResource create( final KsqlResource ksqlResource, final ServiceContext serviceContext, final KsqlRestConfig restConfig, - final KsqlConfig ksqlConfig + final KsqlConfig ksqlConfig, + final CommandRunner commandRunner ) { return new HealthCheckResource( new HealthCheckAgent( new ServerInternalKsqlClient(ksqlResource, new KsqlSecurityContext(Optional.empty(), serviceContext)), - restConfig, serviceContext, ksqlConfig), + restConfig, + serviceContext, + ksqlConfig, + commandRunner), Duration.ofMillis(restConfig.getLong(KsqlRestConfig.KSQL_HEALTHCHECK_INTERVAL_MS_CONFIG)) ); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgentTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgentTest.java index d29c04f2385e..d2f672a4f112 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgentTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/healthcheck/HealthCheckAgentTest.java @@ -17,6 +17,7 @@ import static io.confluent.ksql.rest.healthcheck.HealthCheckAgent.KAFKA_CHECK_NAME; import static io.confluent.ksql.rest.healthcheck.HealthCheckAgent.METASTORE_CHECK_NAME; +import static io.confluent.ksql.rest.healthcheck.HealthCheckAgent.COMMAND_RUNNER_CHECK_NAME; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.Assert.fail; @@ -35,6 +36,7 @@ import io.confluent.ksql.rest.client.RestResponse; import io.confluent.ksql.rest.entity.HealthCheckResponse; import io.confluent.ksql.rest.entity.KsqlEntityList; +import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.SimpleKsqlClient; @@ -78,6 +80,8 @@ public class HealthCheckAgentTest { @Mock private Admin adminClient; @Mock + private CommandRunner commandRunner; + @Mock private RestResponse successfulResponse; @Mock private RestResponse unSuccessfulResponse; @@ -95,13 +99,14 @@ public void setUp() { final DescribeTopicsResult topicsResult = mock(DescribeTopicsResult.class); when(adminClient.describeTopics(any(), any())).thenReturn(topicsResult); when(topicsResult.all()).thenReturn(KafkaFuture.completedFuture(Collections.emptyMap())); + when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.RUNNING); final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of( KsqlConfig.KSQL_SERVICE_ID_CONFIG, "default_" )); - healthCheckAgent = new HealthCheckAgent(ksqlClient, restConfig, serviceContext, ksqlConfig); + healthCheckAgent = new HealthCheckAgent(ksqlClient, restConfig, serviceContext, ksqlConfig, commandRunner); } @Test @@ -113,6 +118,7 @@ public void shouldCheckHealth() { verify(ksqlClient, atLeastOnce()).makeKsqlRequest(eq(SERVER_URI), any(), eq(REQUEST_PROPERTIES)); assertThat(response.getDetails().get(METASTORE_CHECK_NAME).getIsHealthy(), is(true)); assertThat(response.getDetails().get(KAFKA_CHECK_NAME).getIsHealthy(), is(true)); + assertThat(response.getDetails().get(COMMAND_RUNNER_CHECK_NAME).getIsHealthy(), is(true)); assertThat(response.getIsHealthy(), is(true)); } @@ -143,6 +149,19 @@ public void shouldReturnUnhealthyIfKafkaCheckFails() { assertThat(response.getIsHealthy(), is(false)); } + @Test + public void shouldReturnUnhealthyIfCommandRunnerCheckFails() { + // Given: + when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.DEGRADED); + + // When: + final HealthCheckResponse response = healthCheckAgent.checkHealth(); + + // Then: + assertThat(response.getDetails().get(COMMAND_RUNNER_CHECK_NAME).getIsHealthy(), is(false)); + assertThat(response.getIsHealthy(), is(false)); + } + @Test public void shouldReturnHealthyIfKafkaCheckFailsWithAuthorizationException() { // Given: