Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add health check endpoint #3501

Merged
merged 23 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fd22534
feat: add healthcheck endpoint
vcrfxia Oct 7, 2019
6d10311
feat: switch to configexception
vcrfxia Oct 8, 2019
5abf820
feat: make healthcheck interval configurable
vcrfxia Oct 8, 2019
4cebe97
feat: update healthcheck response detail to be a class
vcrfxia Oct 8, 2019
34441bd
feat: healthcheck should not be authenticated
vcrfxia Oct 8, 2019
89297e6
Merge branch 'master' into healthcheck-endpoint
vcrfxia Oct 9, 2019
47818cf
refactor: split out KsqlTarget helper methods
vcrfxia Oct 9, 2019
d619f74
fix: use ServerInternalKsqlClient to get around auth issues
vcrfxia Oct 9, 2019
612f0c0
fix: make Check an interface
vcrfxia Oct 10, 2019
f795d34
fix: move cached response expiry into ResponseCache
vcrfxia Oct 10, 2019
5fa0002
docs: add java doc to ServerInternalKsqlClient
vcrfxia Oct 10, 2019
e8e0647
refactor: make health check two words
vcrfxia Oct 10, 2019
fc0bbba
docs: add docs for new endpoint
vcrfxia Oct 10, 2019
cae31e6
docs: jim's feedback
vcrfxia Oct 10, 2019
548ca3c
refactor: switch to Guava's cache implementation
vcrfxia Oct 14, 2019
3a754bd
fix: sergio's feedback
vcrfxia Oct 14, 2019
ef0797f
fix: checkstyle
vcrfxia Oct 14, 2019
2611061
fix: fix bug in getting entity from response. add integration test
vcrfxia Oct 15, 2019
6d2103b
test: vinoth's feedback
vcrfxia Oct 15, 2019
a678d9e
refactor: switch to LoadingCache
vcrfxia Oct 15, 2019
01cf409
style: checkstyle
vcrfxia Oct 15, 2019
8b176f3
fix: findbugs
vcrfxia Oct 15, 2019
0db13de
fix: make check names public so clients don't have to hardcode
vcrfxia Oct 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.healthcheck;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.HealthcheckResponse;
import io.confluent.ksql.rest.entity.HealthcheckResponseDetail;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.rest.RestConfig;
import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;

public class HealthcheckAgent {
stevenpyzhang marked this conversation as resolved.
Show resolved Hide resolved

private static final List<Check> DEFAULT_CHECKS = ImmutableList.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a bit more abstract. You just need an interface like:

interface Check {
    String name();
    HealthcheckResponseDetail check();
}

The current Check class can just be an implementation of this (e.g. ExecuteStatementCheck). This way we can support more diverse extensions down the road.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, though I updated the signature of the check() method to take a KSQL client and server endpoint, to avoid needing to pass those into each of the individual checks.

new Check("metastore", "list streams; list tables; list queries;"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to make all these public static String variables from the HealthCheckAgent class. That way the constants can also be used in the test for this class and there won't be hardcoded strings split between this class and test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels odd to me to have these statements in non-private variables, since no other class (besides the test class) needs to access them. If the issue is the hard-coded strings, I can pull them out into private (static) variables, but they'd be duplicated in the test anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking if we were going to make a change to which statements to execute for the health check, having it not be hardcoded in the two files would make things easier to change. It's only a minor optimization though so if you don't think it's worth it then I'm fine with how it is now.

new Check("kafka", "list topics extended;")
);

private final ServiceContext serviceContext;
private final URI serverEndpoint;

public HealthcheckAgent(final ServiceContext serviceContext, final KsqlRestConfig restConfig) {
this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext");
this.serverEndpoint = getServerAddress(restConfig);
}

public HealthcheckResponse checkHealth() {
final Map<String, HealthcheckResponseDetail> results = DEFAULT_CHECKS.stream()
.collect(Collectors.toMap(
Check::getName,
check -> new HealthcheckResponseDetail(isSuccessful(check.getKsqlStatement()))
));
final boolean allHealthy = results.values().stream()
.map(HealthcheckResponseDetail::getIsHealthy)
.reduce(Boolean::logicalAnd)
.orElse(true);
return new HealthcheckResponse(allHealthy, results);
}

private boolean isSuccessful(final String ksqlStatement) {
final RestResponse<KsqlEntityList> response =
serviceContext.getKsqlClient().makeKsqlRequest(serverEndpoint, ksqlStatement);
return response.isSuccessful();
}

private static URI getServerAddress(final KsqlRestConfig restConfig) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually needed anymore since the ServerInternalKsqlClient used by the HealthcheckAgent ignores the serverEndpoint parameter in makeKsqlRequest() but I've left this code here in case we'd like to switch back to using a real KSQL client in the future. If preferable I can simply delete it for now instead.

final List<String> listeners = restConfig.getList(RestConfig.LISTENERS_CONFIG);
final String address = listeners.stream()
.map(String::trim)
.findFirst()
stevenpyzhang marked this conversation as resolved.
Show resolved Hide resolved
.orElseThrow(() -> invalidAddressException(listeners, "value cannot be empty"));

try {
return new URL(address).toURI();
} catch (final Exception e) {
throw invalidAddressException(listeners, e.getMessage());
}
}

private static RuntimeException invalidAddressException(
stevenpyzhang marked this conversation as resolved.
Show resolved Hide resolved
final List<String> serverAddresses,
final String message
) {
return new ConfigException(RestConfig.LISTENERS_CONFIG, serverAddresses, message);
}

private static class Check {
private final String name;
private final String ksqlStatement;

Check(final String name, final String ksqlStatement) {
this.name = Objects.requireNonNull(name, "name");
this.ksqlStatement = Objects.requireNonNull(ksqlStatement, "ksqlStatement");
}

String getName() {
return name;
}

String getKsqlStatement() {
return ksqlStatement;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.confluent.ksql.rest.server.computation.StatementExecutor;
import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder;
import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter;
import io.confluent.ksql.rest.server.resources.HealthcheckResource;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper;
import io.confluent.ksql.rest.server.resources.KsqlResource;
Expand Down Expand Up @@ -200,6 +201,7 @@ public void setupResources(final Configurable<?> config, final KsqlRestConfig ap
config.register(statusResource);
config.register(ksqlResource);
config.register(streamedQueryResource);
config.register(HealthcheckResource.create(serviceContext, this.config));
stevenpyzhang marked this conversation as resolved.
Show resolved Hide resolved
config.register(new KsqlExceptionMapper());
config.register(new ServerStateDynamicBinding(serverState));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public class KsqlRestConfig extends RestConfig {
"Whether or not to set KsqlUncaughtExceptionHandler as the UncaughtExceptionHandler "
+ "for all threads in the application (this can be overridden). Default is false.";

public static final String KSQL_HEALTHCHECK_INTERVAL_MS_CONFIG =
KSQL_CONFIG_PREFIX + "healthcheck.interval.ms";
private static final String KSQL_HEALTHCHECK_INTERVAL_MS_DOC =
"Minimum time between consecutive healthcheck evaluations. Healthcheck queries before "
+ "the interval has elapsed will receive cached responses.";

private static final ConfigDef CONFIG_DEF;

static {
Expand Down Expand Up @@ -102,14 +108,20 @@ public class KsqlRestConfig extends RestConfig {
KSQL_SERVER_PRECONDITIONS,
Type.LIST,
"",
Importance.LOW,
KSQL_SERVER_PRECONDITIONS_DOC
Importance.LOW,
KSQL_SERVER_PRECONDITIONS_DOC
).define(
KSQL_SERVER_ENABLE_UNCAUGHT_EXCEPTION_HANDLER,
ConfigDef.Type.BOOLEAN,
false,
Importance.LOW,
KSQL_SERVER_UNCAUGHT_EXCEPTION_HANDLER_DOC
).define(
KSQL_HEALTHCHECK_INTERVAL_MS_CONFIG,
Type.LONG,
5000L,
Importance.LOW,
KSQL_HEALTHCHECK_INTERVAL_MS_DOC
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.server.filters;

import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.server.resources.HealthcheckResource;
import io.confluent.ksql.rest.server.resources.ServerMetadataResource;
import io.confluent.ksql.security.KsqlAuthorizationProvider;
import java.lang.reflect.Method;
Expand All @@ -39,8 +40,10 @@
public class KsqlAuthorizationFilter implements ContainerRequestFilter {
private static final Logger log = LoggerFactory.getLogger(KsqlAuthorizationFilter.class);

private static final Set<String> PATHS_WITHOUT_AUTHORIZATION =
getPathsFrom(ServerMetadataResource.class);
private static final Set<String> PATHS_WITHOUT_AUTHORIZATION = getPathsFrom(
ServerMetadataResource.class,
HealthcheckResource.class
);

private final KsqlAuthorizationProvider authorizationProvider;

Expand Down Expand Up @@ -75,17 +78,20 @@ private boolean requiresAuthorization(final String path) {
return !PATHS_WITHOUT_AUTHORIZATION.contains(path);
}

private static Set<String> getPathsFrom(final Class<?> resourceClass) {
private static Set<String> getPathsFrom(final Class<?> ...resourceClass) {

final Set<String> paths = new HashSet<>();
final String mainPath = StringUtils.stripEnd(
resourceClass.getAnnotation(Path.class).value(), "/"
);
for (final Class<?> clazz : resourceClass) {
final String mainPath = StringUtils.stripEnd(
clazz.getAnnotation(Path.class).value(), "/"
);

paths.add(mainPath);
for (Method m : resourceClass.getMethods()) {
if (m.isAnnotationPresent(Path.class)) {
paths.add(mainPath + "/"
+ StringUtils.strip(m.getAnnotation(Path.class).value(), "/"));
paths.add(mainPath);
for (Method m : clazz.getMethods()) {
if (m.isAnnotationPresent(Path.class)) {
paths.add(mainPath + "/"
+ StringUtils.strip(m.getAnnotation(Path.class).value(), "/"));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.resources;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.entity.HealthcheckResponse;
import io.confluent.ksql.rest.entity.Versions;
import io.confluent.ksql.rest.healthcheck.HealthcheckAgent;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.services.ServiceContext;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Supplier;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/healthcheck")
@Produces({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON})
public class HealthcheckResource {
private final HealthcheckAgent healthcheckAgent;
private final Duration healthcheckInterval;
private final Supplier<Long> currentTimeSupplier;
private final ResponseCache responseCache;

@VisibleForTesting
HealthcheckResource(
final HealthcheckAgent healthcheckAgent,
final Duration healthcheckInterval,
final Supplier<Long> currentTimeSupplier
) {
this.healthcheckAgent = Objects.requireNonNull(healthcheckAgent, "healthcheckAgent");
this.healthcheckInterval = Objects.requireNonNull(healthcheckInterval, "healthcheckInterval");
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier, "currentTimeSupplier");
this.responseCache = new ResponseCache(currentTimeSupplier);
}

@GET
public Response checkHealth() {
return Response.ok(getResponse()).build();
}

private HealthcheckResponse getResponse() {
if (responseCache.isEmpty() || timeSinceLastResponse().compareTo(healthcheckInterval) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be cleaner to move the expiry into the cache, e.g.:

final Optional<HealthCheckResponse> response = responseCache.get();
if (response.isPresent()) {
    return response.get();
} 
final HealthcheckResponse fresh = healtcheckAgent.checkHealth();
responseCache.cache(fresh);
return fresh;

and in the Cache, get() can return empty if nothing is cached or if its expired.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, though I wonder if it's clear from just reading

final Optional<HealthCheckResponse> response = responseCache.get();
if (response.isPresent()) {
    return response.get();
} 

that the cache handles expiration (rather than returning any previously cached response). Maybe I should update the cache name to be ExpiringResponseCache or something similar?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe if we change the get() method to getIfPresent() it'll be clearer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact that the return type is an Optional already indicates that "if present" part. It was the fact that the cache expires that I didn't think was captured. I don't want to block the PR on this though, can always open a follow-up to change the name.

Copy link
Member

@stevenpyzhang stevenpyzhang Oct 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getIfUnexpired() then to indicate expiration behavior? I do agree this is a minor point.

final HealthcheckResponse response = healthcheckAgent.checkHealth();
responseCache.cache(response);
return response;
} else {
return responseCache.lastResponse();
}
}

private Duration timeSinceLastResponse() {
return Duration.ofMillis(currentTimeSupplier.get() - responseCache.lastTimestamp());
}

public static HealthcheckResource create(
stevenpyzhang marked this conversation as resolved.
Show resolved Hide resolved
final ServiceContext serviceContext,
final KsqlRestConfig restConfig
) {
return new HealthcheckResource(
new HealthcheckAgent(serviceContext, restConfig),
Duration.ofMillis(restConfig.getLong(KsqlRestConfig.KSQL_HEALTHCHECK_INTERVAL_MS_CONFIG)),
System::currentTimeMillis
);
}

private static class ResponseCache {
private final Supplier<Long> currentTimeSupplier;
private HealthcheckResponse response;
private long timestamp;

ResponseCache(final Supplier<Long> currentTimeSupplier) {
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier, "currentTimeSupplier");
}

void cache(final HealthcheckResponse response) {
this.response = response;
this.timestamp = currentTimeSupplier.get();
}

boolean isEmpty() {
return response == null;
}

HealthcheckResponse lastResponse() {
return response;
}

long lastTimestamp() {
return timestamp;
}
}
}
Loading