Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add an endpoint for returning the query limit configuration #6353

Merged
merged 5 commits into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Add an endpoint for returning the query limit configuration
  • Loading branch information
Zara Lim committed Oct 2, 2020
commit 95e4a3b43d9eca2b18fc7995ddc3d3a3a06852e5
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -201,7 +202,10 @@ private Router setupRouter() {
.produces(Versions.KSQL_V1_JSON)
.produces(JSON_CONTENT_TYPE)
.handler(this::handleWebsocket);

router.route(HttpMethod.GET, "/v1/configs")
.produces(Versions.KSQL_V1_JSON)
.produces(JSON_CONTENT_TYPE)
.handler(this::handleConfigRequest);
return router;
}

Expand Down Expand Up @@ -321,6 +325,15 @@ private void handleWebsocket(final RoutingContext routingContext) {
server.getWorkerExecutor(), apiSecurityContext);
}

private void handleConfigRequest(final RoutingContext routingContext) {
final List<String> requestedConfigs = routingContext.queryParam("name");
handleOldApiRequest(server, routingContext, null,
(request, apiSecurityContext) ->
endpoints
.executeConfig(requestedConfigs, DefaultApiSecurityContext.create(routingContext))
);
}

private static void chcHandler(final RoutingContext routingContext) {
routingContext.response().putHeader(HttpHeaders.CONTENT_TYPE.toString(), "application/json")
.end(new JsonObject().toBuffer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.vertx.core.WorkerExecutor;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonObject;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -104,6 +105,9 @@ CompletableFuture<EndpointResponse> executeLagReport(LagReportingMessage lagRepo
CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
ApiSecurityContext apiSecurityContext);

CompletableFuture<EndpointResponse> executeConfig(
List<String> requestedConfigs, ApiSecurityContext apiSecurityContext);

// This is the legacy websocket based query streaming API
void executeWebsocketStream(ServerWebSocket webSocket, MultiMap requstParams,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.resources.ClusterStatusResource;
import io.confluent.ksql.rest.server.resources.ConfigResource;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
import io.confluent.ksql.rest.server.resources.HeartbeatResource;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
Expand Down Expand Up @@ -181,6 +182,7 @@ public final class KsqlRestApplication implements Executable {
private final HealthCheckResource healthCheckResource;
private volatile ServerMetadataResource serverMetadataResource;
private volatile WSQueryEndpoint wsQueryEndpoint;
private final ConfigResource configResource;
@SuppressWarnings("UnstableApiUsage")
private volatile ListeningScheduledExecutorService oldApiWebsocketExecutor;
private final Vertx vertx;
Expand Down Expand Up @@ -275,6 +277,7 @@ public static SourceName getCommandsStreamName() {
this.restConfig,
this.ksqlConfigNoPort);
this.queryMonitor = requireNonNull(ksqlQueryMonitor, "ksqlQueryMonitor");
this.configResource = new ConfigResource(ksqlConfig);
MetricCollectors.addConfigurableReporter(ksqlConfigNoPort);
log.debug("ksqlDB API server instance created");
}
Expand Down Expand Up @@ -334,7 +337,8 @@ public void startAsync() {
lagReportingResource,
healthCheckResource,
serverMetadataResource,
wsQueryEndpoint
wsQueryEndpoint,
configResource
);
apiServer = new Server(vertx, ksqlRestConfig, endpoints, securityExtension,
authenticationPlugin, serverState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.resources.ClusterStatusResource;
import io.confluent.ksql.rest.server.resources.ConfigResource;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
import io.confluent.ksql.rest.server.resources.HeartbeatResource;
import io.confluent.ksql.rest.server.resources.KsqlResource;
Expand All @@ -51,6 +52,7 @@
import io.vertx.core.WorkerExecutor;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.JsonObject;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -77,6 +79,7 @@ public class KsqlServerEndpoints implements Endpoints {
private final HealthCheckResource healthCheckResource;
private final ServerMetadataResource serverMetadataResource;
private final WSQueryEndpoint wsQueryEndpoint;
private final ConfigResource configResource;

// CHECKSTYLE_RULES.OFF: ParameterNumber
public KsqlServerEndpoints(
Expand All @@ -93,7 +96,8 @@ public KsqlServerEndpoints(
final Optional<LagReportingResource> lagReportingResource,
final HealthCheckResource healthCheckResource,
final ServerMetadataResource serverMetadataResource,
final WSQueryEndpoint wsQueryEndpoint) {
final WSQueryEndpoint wsQueryEndpoint,
final ConfigResource configResource) {

// CHECKSTYLE_RULES.ON: ParameterNumber
this.ksqlEngine = Objects.requireNonNull(ksqlEngine);
Expand All @@ -111,6 +115,7 @@ public KsqlServerEndpoints(
this.healthCheckResource = Objects.requireNonNull(healthCheckResource);
this.serverMetadataResource = Objects.requireNonNull(serverMetadataResource);
this.wsQueryEndpoint = Objects.requireNonNull(wsQueryEndpoint);
this.configResource = Objects.requireNonNull(configResource);
}

@Override
Expand Down Expand Up @@ -245,6 +250,19 @@ public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
ksqlSecurityContext -> serverMetadataResource.getServerClusterId());
}

@Override
public CompletableFuture<EndpointResponse> executeConfig(
final List<String> requestedConfigs,
final ApiSecurityContext apiSecurityContext) {
if (requestedConfigs.size() == 0) {
return executeOldApiEndpoint(apiSecurityContext,
ksqlSecurityContext -> configResource.getAllConfigs());
} else {
return executeOldApiEndpoint(apiSecurityContext,
ksqlSecurityContext -> configResource.getConfigs(requestedConfigs));
}
}

@Override
public void executeWebsocketStream(final ServerWebSocket webSocket, final MultiMap requestParams,
final WorkerExecutor workerExecutor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.resources;

import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.entity.ConfigResponse;
import io.confluent.ksql.util.KsqlConfig;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ConfigResource {
private final Map<String, Object> allowedConfigs = new HashMap<>();
jzaralim marked this conversation as resolved.
Show resolved Hide resolved

public ConfigResource(final KsqlConfig ksqlConfig) {
setAllowedConfigs(ksqlConfig);
}

private void setAllowedConfigs(final KsqlConfig ksqlConfig) {
allowedConfigs.put(
KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG,
ksqlConfig.getInt(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)
);
}

public EndpointResponse getConfigs(final List<String> requestedConfigs) {
final Map<String, Object> configs = new HashMap<>();
for (String config : requestedConfigs) {
if (allowedConfigs.containsKey(config)) {
configs.put(config, allowedConfigs.get(config));
}
}
return EndpointResponse.ok(new ConfigResponse(configs));
}

public EndpointResponse getAllConfigs() {
return EndpointResponse.ok(new ConfigResponse(allowedConfigs));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
return null;
}

@Override
public CompletableFuture<EndpointResponse> executeConfig(
List<String> requestedConfigs, ApiSecurityContext apiSecurityContext) {
return null;
}

@Override
public void executeWebsocketStream(ServerWebSocket webSocket, MultiMap requstParams,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.streams.ReadStream;
import io.vertx.ext.web.codec.BodyCodec;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -247,6 +248,13 @@ public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
return null;
}

@Override
public CompletableFuture<EndpointResponse> executeConfig(
List<String> requestedConfigs,
ApiSecurityContext apiSecurityContext) {
return null;
}

@Override
public void executeWebsocketStream(ServerWebSocket webSocket, MultiMap requstParams,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
return null;
}

@Override
public CompletableFuture<EndpointResponse> executeConfig(
List<String> requestedConfigs,
ApiSecurityContext apiSecurityContext) {
return null;
}

@Override
public void executeWebsocketStream(ServerWebSocket webSocket, MultiMap requstParams,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ public CompletableFuture<EndpointResponse> executeServerMetadataClusterId(
return null;
}

@Override
public CompletableFuture<EndpointResponse> executeConfig(
List<String> requestedConfigs,
ApiSecurityContext apiSecurityContext) {
return null;
}

@Override
public void executeWebsocketStream(ServerWebSocket webSocket, MultiMap requstParams,
WorkerExecutor workerExecutor, ApiSecurityContext apiSecurityContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatus.Status;
import io.confluent.ksql.rest.entity.CommandStatuses;
import io.confluent.ksql.rest.entity.ConfigResponse;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.ServerClusterId;
import io.confluent.ksql.rest.entity.ServerInfo;
Expand All @@ -62,6 +63,7 @@
import io.confluent.ksql.test.util.secure.ClientTrustStore;
import io.confluent.ksql.test.util.secure.Credentials;
import io.confluent.ksql.test.util.secure.SecureKafkaHelper;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PageViewDataProvider;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
Expand Down Expand Up @@ -289,6 +291,28 @@ public void shouldExecuteServerMetadataIdRequest() {
assertThat(response, is(notNullValue()));
}

@Test
public void shouldExecuteAllConfigsRequest() {
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
// When:
final ConfigResponse response = RestIntegrationTestUtil.makeConfigRequest(REST_APP);

// Then:
assertThat(response.getConfigs().get(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG), is(notNullValue()));
}

@Test
public void shouldExecuteConfigRequest() {
// When:
final ConfigResponse response = RestIntegrationTestUtil.makeConfigRequest(
REST_APP,
Arrays.asList(new String[] {"foo", KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG})
);

// Then:
assertThat(response.getConfigs().keySet().size(), is(1));
assertThat(response.getConfigs().get(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG), is(notNullValue()));
}

@Test
public void shouldExecuteRootDocumentRequest() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.rest.entity.CommandStatus.Status;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.CommandStatuses;
import io.confluent.ksql.rest.entity.ConfigResponse;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand Down Expand Up @@ -156,6 +157,31 @@ static ServerClusterId makeServerMetadataIdRequest(final TestKsqlRestApp restApp
}
}

static ConfigResponse makeConfigRequest(final TestKsqlRestApp restApp) {
try (final KsqlRestClient restClient = restApp.buildKsqlClient(Optional.empty())) {

final RestResponse<ConfigResponse> res = restClient.makeConfigRequest();

throwOnError(res);

return res.getResponse();
}
}

static ConfigResponse makeConfigRequest(
final TestKsqlRestApp restApp,
final List<String> requestedConfigs
) {
try (final KsqlRestClient restClient = restApp.buildKsqlClient(Optional.empty())) {

final RestResponse<ConfigResponse> res = restClient.makeConfigRequest(requestedConfigs);

throwOnError(res);

return res.getResponse();
}
}

static List<StreamedRow> makeQueryRequest(
final TestKsqlRestApp restApp,
final String sql,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatuses;
import io.confluent.ksql.rest.entity.ConfigResponse;
import io.confluent.ksql.rest.entity.HealthCheckResponse;
import io.confluent.ksql.rest.entity.HeartbeatResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
Expand Down Expand Up @@ -149,6 +150,14 @@ public RestResponse<ClusterStatusResponse> makeClusterStatusRequest() {
return target().getClusterStatus();
}

public RestResponse<ConfigResponse> makeConfigRequest() {
return target().getConfigRequest();
}

public RestResponse<ConfigResponse> makeConfigRequest(final List<String> requestedConfigs) {
return target().getConfigRequest(requestedConfigs);
}

public CompletableFuture<RestResponse<LagReportingResponse>> makeAsyncLagReportingRequest(
final LagReportingMessage lagReportingMessage
) {
Expand Down
Loading