Skip to content

Commit

Permalink
feat: add commandRunnerCheck to healthcheck detail (#6346)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Oct 2, 2020
1 parent 28b8486 commit 5f64d05
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.ServerUtil;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.util.KsqlConfig;
Expand All @@ -47,27 +48,32 @@ public class HealthCheckAgent {

public static final String METASTORE_CHECK_NAME = "metastore";
public static final String KAFKA_CHECK_NAME = "kafka";
public static final String COMMAND_RUNNER_CHECK_NAME = "commandRunner";

private static final List<Check> DEFAULT_CHECKS = ImmutableList.of(
new ExecuteStatementCheck(METASTORE_CHECK_NAME, "list streams; list tables; list queries;"),
new KafkaBrokerCheck(KAFKA_CHECK_NAME)
new KafkaBrokerCheck(KAFKA_CHECK_NAME),
new CommandRunnerCheck(COMMAND_RUNNER_CHECK_NAME)
);

private final SimpleKsqlClient ksqlClient;
private final URI serverEndpoint;
private final ServiceContext serviceContext;
private final KsqlConfig ksqlConfig;
private final CommandRunner commandRunner;

public HealthCheckAgent(
final SimpleKsqlClient ksqlClient,
final KsqlRestConfig restConfig,
final ServiceContext serviceContext,
final KsqlConfig ksqlConfig
final KsqlConfig ksqlConfig,
final CommandRunner commandRunner
) {
this.ksqlClient = Objects.requireNonNull(ksqlClient, "ksqlClient");
this.serverEndpoint = ServerUtil.getServerAddress(restConfig);
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner");
}

public HealthCheckResponse checkHealth() {
Expand Down Expand Up @@ -155,4 +161,24 @@ public HealthCheckResponseDetail check(final HealthCheckAgent healthCheckAgent)
return new HealthCheckResponseDetail(isHealthy);
}
}

private static class CommandRunnerCheck implements Check {
private final String name;

CommandRunnerCheck(final String name) {
this.name = Objects.requireNonNull(name, "name");
}

@Override
public String getName() {
return name;
}

@Override
public HealthCheckResponseDetail check(final HealthCheckAgent healthCheckAgent) {
return new HealthCheckResponseDetail(
healthCheckAgent.commandRunner.checkCommandRunnerStatus()
== CommandRunner.CommandRunnerStatus.RUNNING);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ public static SourceName getCommandsStreamName() {
ksqlResource,
serviceContext,
this.restConfig,
this.ksqlConfigNoPort);
this.ksqlConfigNoPort,
this.commandRunner);
this.queryMonitor = requireNonNull(ksqlQueryMonitor, "ksqlQueryMonitor");
MetricCollectors.addConfigurableReporter(ksqlConfigNoPort);
this.pullQueryMetrics = requireNonNull(pullQueryMetrics, "pullQueryMetrics");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.rest.entity.HealthCheckResponse;
import io.confluent.ksql.rest.healthcheck.HealthCheckAgent;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.services.ServerInternalKsqlClient;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.ServiceContext;
Expand Down Expand Up @@ -62,13 +63,17 @@ public static HealthCheckResource create(
final KsqlResource ksqlResource,
final ServiceContext serviceContext,
final KsqlRestConfig restConfig,
final KsqlConfig ksqlConfig
final KsqlConfig ksqlConfig,
final CommandRunner commandRunner
) {
return new HealthCheckResource(
new HealthCheckAgent(
new ServerInternalKsqlClient(ksqlResource,
new KsqlSecurityContext(Optional.empty(), serviceContext)),
restConfig, serviceContext, ksqlConfig),
restConfig,
serviceContext,
ksqlConfig,
commandRunner),
Duration.ofMillis(restConfig.getLong(KsqlRestConfig.KSQL_HEALTHCHECK_INTERVAL_MS_CONFIG))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static io.confluent.ksql.rest.healthcheck.HealthCheckAgent.KAFKA_CHECK_NAME;
import static io.confluent.ksql.rest.healthcheck.HealthCheckAgent.METASTORE_CHECK_NAME;
import static io.confluent.ksql.rest.healthcheck.HealthCheckAgent.COMMAND_RUNNER_CHECK_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
Expand All @@ -35,6 +36,7 @@
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.HealthCheckResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.SimpleKsqlClient;
Expand Down Expand Up @@ -78,6 +80,8 @@ public class HealthCheckAgentTest {
@Mock
private Admin adminClient;
@Mock
private CommandRunner commandRunner;
@Mock
private RestResponse<KsqlEntityList> successfulResponse;
@Mock
private RestResponse<KsqlEntityList> unSuccessfulResponse;
Expand All @@ -95,13 +99,14 @@ public void setUp() {
final DescribeTopicsResult topicsResult = mock(DescribeTopicsResult.class);
when(adminClient.describeTopics(any(), any())).thenReturn(topicsResult);
when(topicsResult.all()).thenReturn(KafkaFuture.completedFuture(Collections.emptyMap()));
when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.RUNNING);

final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of(
KsqlConfig.KSQL_SERVICE_ID_CONFIG,
"default_"
));

healthCheckAgent = new HealthCheckAgent(ksqlClient, restConfig, serviceContext, ksqlConfig);
healthCheckAgent = new HealthCheckAgent(ksqlClient, restConfig, serviceContext, ksqlConfig, commandRunner);
}

@Test
Expand All @@ -113,6 +118,7 @@ public void shouldCheckHealth() {
verify(ksqlClient, atLeastOnce()).makeKsqlRequest(eq(SERVER_URI), any(), eq(REQUEST_PROPERTIES));
assertThat(response.getDetails().get(METASTORE_CHECK_NAME).getIsHealthy(), is(true));
assertThat(response.getDetails().get(KAFKA_CHECK_NAME).getIsHealthy(), is(true));
assertThat(response.getDetails().get(COMMAND_RUNNER_CHECK_NAME).getIsHealthy(), is(true));
assertThat(response.getIsHealthy(), is(true));
}

Expand Down Expand Up @@ -143,6 +149,19 @@ public void shouldReturnUnhealthyIfKafkaCheckFails() {
assertThat(response.getIsHealthy(), is(false));
}

@Test
public void shouldReturnUnhealthyIfCommandRunnerCheckFails() {
// Given:
when(commandRunner.checkCommandRunnerStatus()).thenReturn(CommandRunner.CommandRunnerStatus.DEGRADED);

// When:
final HealthCheckResponse response = healthCheckAgent.checkHealth();

// Then:
assertThat(response.getDetails().get(COMMAND_RUNNER_CHECK_NAME).getIsHealthy(), is(false));
assertThat(response.getIsHealthy(), is(false));
}

@Test
public void shouldReturnHealthyIfKafkaCheckFailsWithAuthorizationException() {
// Given:
Expand Down

0 comments on commit 5f64d05

Please sign in to comment.