Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add health check endpoint #3501

Merged
merged 23 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fd22534
feat: add healthcheck endpoint
vcrfxia Oct 7, 2019
6d10311
feat: switch to configexception
vcrfxia Oct 8, 2019
5abf820
feat: make healthcheck interval configurable
vcrfxia Oct 8, 2019
4cebe97
feat: update healthcheck response detail to be a class
vcrfxia Oct 8, 2019
34441bd
feat: healthcheck should not be authenticated
vcrfxia Oct 8, 2019
89297e6
Merge branch 'master' into healthcheck-endpoint
vcrfxia Oct 9, 2019
47818cf
refactor: split out KsqlTarget helper methods
vcrfxia Oct 9, 2019
d619f74
fix: use ServerInternalKsqlClient to get around auth issues
vcrfxia Oct 9, 2019
612f0c0
fix: make Check an interface
vcrfxia Oct 10, 2019
f795d34
fix: move cached response expiry into ResponseCache
vcrfxia Oct 10, 2019
5fa0002
docs: add java doc to ServerInternalKsqlClient
vcrfxia Oct 10, 2019
e8e0647
refactor: make health check two words
vcrfxia Oct 10, 2019
fc0bbba
docs: add docs for new endpoint
vcrfxia Oct 10, 2019
cae31e6
docs: jim's feedback
vcrfxia Oct 10, 2019
548ca3c
refactor: switch to Guava's cache implementation
vcrfxia Oct 14, 2019
3a754bd
fix: sergio's feedback
vcrfxia Oct 14, 2019
ef0797f
fix: checkstyle
vcrfxia Oct 14, 2019
2611061
fix: fix bug in getting entity from response. add integration test
vcrfxia Oct 15, 2019
6d2103b
test: vinoth's feedback
vcrfxia Oct 15, 2019
a678d9e
refactor: switch to LoadingCache
vcrfxia Oct 15, 2019
01cf409
style: checkstyle
vcrfxia Oct 15, 2019
8b176f3
fix: findbugs
vcrfxia Oct 15, 2019
0db13de
fix: make check names public so clients don't have to hardcode
vcrfxia Oct 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: split out KsqlTarget helper methods
  • Loading branch information
vcrfxia committed Oct 9, 2019
commit 47818cf91181dbc05b32ef46d346d2fe78e99e1e
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client;

import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import java.util.Optional;
import java.util.function.Function;
import javax.naming.AuthenticationException;
import javax.ws.rs.core.Response;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpStatus.Code;

public final class KsqlClientUtil {

private KsqlClientUtil() {
}

public static <T> RestResponse<T> toRestResponse(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contents of this new util file were lifted from KsqlTarget, so the methods may be accessible to the new ServerInternalKsqlClient as well.

final Response response,
final String path,
final Function<Response, T> mapper
) {
final Code statusCode = HttpStatus.getCode(response.getStatus());
return statusCode == Code.OK
? RestResponse.successful(statusCode, mapper.apply(response))
: createErrorResponse(path, response);
}

private static <T> RestResponse<T> createErrorResponse(
final String path,
final Response response
) {
final Code statusCode = HttpStatus.getCode(response.getStatus());
final Optional<KsqlErrorMessage> errorMessage = tryReadErrorMessage(response);
if (errorMessage.isPresent()) {
return RestResponse.erroneous(statusCode, errorMessage.get());
}

if (statusCode == Code.NOT_FOUND) {
return RestResponse.erroneous(statusCode,
"Path not found. Path='" + path + "'. "
+ "Check your ksql http url to make sure you are connecting to a ksql server."
);
}

if (statusCode == Code.UNAUTHORIZED) {
return RestResponse.erroneous(statusCode, unauthorizedErrorMsg());
}

if (statusCode == Code.FORBIDDEN) {
return RestResponse.erroneous(statusCode, forbiddenErrorMsg());
}

return RestResponse.erroneous(
statusCode,
"The server returned an unexpected error: "
+ response.getStatusInfo().getReasonPhrase());
}

private static Optional<KsqlErrorMessage> tryReadErrorMessage(final Response response) {
try {
return Optional.ofNullable(response.readEntity(KsqlErrorMessage.class));
} catch (final Exception e) {
return Optional.empty();
}
}

private static KsqlErrorMessage unauthorizedErrorMsg() {
return new KsqlErrorMessage(
Errors.ERROR_CODE_UNAUTHORIZED,
new AuthenticationException(
"Could not authenticate successfully with the supplied credentials.")
);
}

private static KsqlErrorMessage forbiddenErrorMsg() {
return new KsqlErrorMessage(
Errors.ERROR_CODE_FORBIDDEN,
new AuthenticationException("You are forbidden from using this cluster.")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@
import static java.util.Objects.requireNonNull;

import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatuses;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.ServerInfo;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.naming.AuthenticationException;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
Expand All @@ -39,8 +36,6 @@
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpStatus.Code;
import org.glassfish.jersey.client.ClientProperties;

@SuppressWarnings("WeakerAccess") // Public API
Expand Down Expand Up @@ -139,11 +134,7 @@ private <T> RestResponse<T> get(final String path, final Class<T> type) {
.headers(headers())
.get()
) {
final Code statusCode = HttpStatus.getCode(response.getStatus());
return statusCode == Code.OK
? RestResponse.successful(statusCode, response.readEntity(type))
: createErrorResponse(path, response);

return KsqlClientUtil.toRestResponse(response, path, r -> r.readEntity(type));
} catch (final Exception e) {
throw new KsqlRestClientException("Error issuing GET to KSQL server. path:" + path, e);
}
Expand All @@ -166,11 +157,7 @@ private <T> RestResponse<T> post(
.headers(headers())
.post(Entity.json(jsonEntity));

final Code statusCode = HttpStatus.getCode(response.getStatus());
return statusCode == Code.OK
? RestResponse.successful(statusCode, mapper.apply(response))
: createErrorResponse(path, response);

return KsqlClientUtil.toRestResponse(response, path, mapper);
} catch (final ProcessingException e) {
if (shouldRetry(readTimeoutMs, e)) {
return post(path, jsonEntity, calcReadTimeout(readTimeoutMs), closeResponse, mapper);
Expand Down Expand Up @@ -202,58 +189,4 @@ private static boolean shouldRetry(
private static Optional<Integer> calcReadTimeout(final Optional<Integer> previousTimeoutMs) {
return previousTimeoutMs.map(timeout -> Math.min(timeout * 2, MAX_TIMEOUT));
}

private static <T> RestResponse<T> createErrorResponse(
final String path,
final Response response
) {
final Code statusCode = HttpStatus.getCode(response.getStatus());
final Optional<KsqlErrorMessage> errorMessage = tryReadErrorMessage(response);
if (errorMessage.isPresent()) {
return RestResponse.erroneous(statusCode, errorMessage.get());
}

if (statusCode == Code.NOT_FOUND) {
return RestResponse.erroneous(statusCode,
"Path not found. Path='" + path + "'. "
+ "Check your ksql http url to make sure you are connecting to a ksql server."
);
}

if (statusCode == Code.UNAUTHORIZED) {
return RestResponse.erroneous(statusCode, unauthorizedErrorMsg());
}

if (statusCode == Code.FORBIDDEN) {
return RestResponse.erroneous(statusCode, forbiddenErrorMsg());
}

return RestResponse.erroneous(
statusCode,
"The server returned an unexpected error: "
+ response.getStatusInfo().getReasonPhrase());
}

private static Optional<KsqlErrorMessage> tryReadErrorMessage(final Response response) {
try {
return Optional.ofNullable(response.readEntity(KsqlErrorMessage.class));
} catch (final Exception e) {
return Optional.empty();
}
}

private static KsqlErrorMessage unauthorizedErrorMsg() {
return new KsqlErrorMessage(
Errors.ERROR_CODE_UNAUTHORIZED,
new AuthenticationException(
"Could not authenticate successfully with the supplied credentials.")
);
}

private static KsqlErrorMessage forbiddenErrorMsg() {
return new KsqlErrorMessage(
Errors.ERROR_CODE_FORBIDDEN,
new AuthenticationException("You are forbidden from using this cluster.")
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.client;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import java.util.function.Function;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.eclipse.jetty.http.HttpStatus.Code;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class KsqlClientUtilTest {

private static final String PATH = "/ksql";
private static final String ERROR_REASON = "something bad";

@Mock
private Response response;
@Mock
private Function<Response, KsqlEntityList> mapper;
@Mock
private KsqlEntityList entities;
@Mock
private KsqlErrorMessage errorMessage;
@Mock
private Response.StatusType statusInfo;

@Before
public void setUp() {
when(mapper.apply(response)).thenReturn(entities);
}

@Test
public void shouldCreateRestResponseFromSuccessfulResponse() {
// Given:
when(response.getStatus()).thenReturn(Status.OK.getStatusCode());

// When:
final RestResponse<KsqlEntityList> restResponse =
KsqlClientUtil.toRestResponse(response, PATH, mapper);

// Then:
assertThat("is successful", restResponse.isSuccessful());
assertThat(restResponse.getStatusCode(), is(Code.OK));
assertThat(restResponse.getResponse(), sameInstance(entities));
}

@Test
public void shouldCreateRestResponseFromUnsuccessfulResponseWithMessage() {
// Given:
when(response.getStatus()).thenReturn(Status.BAD_REQUEST.getStatusCode());
when(response.readEntity(KsqlErrorMessage.class)).thenReturn(errorMessage);

// When:
final RestResponse<KsqlEntityList> restResponse =
KsqlClientUtil.toRestResponse(response, PATH, mapper);

// Then:
assertThat("is erroneous", restResponse.isErroneous());
assertThat(restResponse.getStatusCode(), is(Code.BAD_REQUEST));
assertThat(restResponse.getErrorMessage(), is(errorMessage));
verify(mapper, never()).apply(any());
}

@Test
public void shouldCreateRestResponseFromNotFoundResponse() {
// Given:
when(response.getStatus()).thenReturn(Status.NOT_FOUND.getStatusCode());

// When:
final RestResponse<KsqlEntityList> restResponse =
KsqlClientUtil.toRestResponse(response, PATH, mapper);

// Then:
assertThat("is erroneous", restResponse.isErroneous());
assertThat(restResponse.getStatusCode(), is(Code.NOT_FOUND));
assertThat(restResponse.getErrorMessage().getMessage(),
containsString("Check your ksql http url to make sure you are connecting to a ksql server"));
}

@Test
public void shouldCreateRestResponseFromUnauthorizedResponse() {
// Given:
when(response.getStatus()).thenReturn(Status.UNAUTHORIZED.getStatusCode());

// When:
final RestResponse<KsqlEntityList> restResponse =
KsqlClientUtil.toRestResponse(response, PATH, mapper);

// Then:
assertThat("is erroneous", restResponse.isErroneous());
assertThat(restResponse.getStatusCode(), is(Code.UNAUTHORIZED));
assertThat(restResponse.getErrorMessage().getMessage(),
containsString("Could not authenticate successfully with the supplied credential"));
}

@Test
public void shouldCreateRestResponseFromForbiddenResponse() {
// Given:
when(response.getStatus()).thenReturn(Status.FORBIDDEN.getStatusCode());

// When:
final RestResponse<KsqlEntityList> restResponse =
KsqlClientUtil.toRestResponse(response, PATH, mapper);

// Then:
assertThat("is erroneous", restResponse.isErroneous());
assertThat(restResponse.getStatusCode(), is(Code.FORBIDDEN));
assertThat(restResponse.getErrorMessage().getMessage(),
containsString("You are forbidden from using this cluster"));
}

@Test
public void shouldCreateRestResponseFromUnknownResponse() {
// Given:
when(response.getStatus()).thenReturn(Status.INTERNAL_SERVER_ERROR.getStatusCode());
when(response.getStatusInfo()).thenReturn(statusInfo);
when(statusInfo.getReasonPhrase()).thenReturn(ERROR_REASON);

// When:
final RestResponse<KsqlEntityList> restResponse =
KsqlClientUtil.toRestResponse(response, PATH, mapper);

// Then:
assertThat("is erroneous", restResponse.isErroneous());
assertThat(restResponse.getStatusCode(), is(Code.INTERNAL_SERVER_ERROR));
assertThat(restResponse.getErrorMessage().getMessage(),
containsString("The server returned an unexpected error"));
assertThat(restResponse.getErrorMessage().getMessage(),
containsString(ERROR_REASON));
}
}