-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(client): support admin operations in Java client (#5671)
- Loading branch information
Showing
12 changed files
with
756 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/StreamInfo.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client; | ||
|
||
/** | ||
* Metadata for a ksqlDB stream. | ||
*/ | ||
public interface StreamInfo { | ||
|
||
/** | ||
* @return the name of this stream | ||
*/ | ||
String getName(); | ||
|
||
/** | ||
* @return the name of the Kafka topic underlying this ksqlDB stream | ||
*/ | ||
String getTopic(); | ||
|
||
/** | ||
* @return the format of the data in this stream | ||
*/ | ||
String getFormat(); | ||
|
||
} |
43 changes: 43 additions & 0 deletions
43
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/TableInfo.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client; | ||
|
||
/** | ||
* Metadata for a ksqlDB table. | ||
*/ | ||
public interface TableInfo { | ||
|
||
/** | ||
* @return the name of this table | ||
*/ | ||
String getName(); | ||
|
||
/** | ||
* @return the name of the Kafka topic underlying this ksqlDB table | ||
*/ | ||
String getTopic(); | ||
|
||
/** | ||
* @return the format of the data in this table | ||
*/ | ||
String getFormat(); | ||
|
||
/** | ||
* @return whether this ksqlDB table is windowed | ||
*/ | ||
boolean isWindowed(); | ||
|
||
} |
43 changes: 43 additions & 0 deletions
43
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/TopicInfo.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Metadata for a Kafka topic available for use with ksqlDB. | ||
*/ | ||
public interface TopicInfo { | ||
|
||
/** | ||
* @return the name of this topic | ||
*/ | ||
String getName(); | ||
|
||
/** | ||
* @return the number of partitions for this topic | ||
*/ | ||
int getPartitions(); | ||
|
||
/** | ||
* Returns the number of replicas for each topic partition. | ||
* | ||
* @return a list with size equal to the number of partitions. Each element is the number of | ||
* replicas for the partition corresponding to the list index. | ||
*/ | ||
List<Integer> getReplicasPerPartition(); | ||
|
||
} |
98 changes: 98 additions & 0 deletions
98
ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/AdminResponseHandlers.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client.impl; | ||
|
||
import io.confluent.ksql.api.client.StreamInfo; | ||
import io.confluent.ksql.api.client.TableInfo; | ||
import io.confluent.ksql.api.client.TopicInfo; | ||
import io.vertx.core.json.JsonArray; | ||
import io.vertx.core.json.JsonObject; | ||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.stream.Collectors; | ||
|
||
final class AdminResponseHandlers { | ||
|
||
private AdminResponseHandlers() { | ||
} | ||
|
||
static void handleListStreamsResponse( | ||
final JsonObject streamsListEntity, | ||
final CompletableFuture<List<StreamInfo>> cf | ||
) { | ||
try { | ||
final JsonArray streams = streamsListEntity.getJsonArray("streams"); | ||
cf.complete(streams.stream() | ||
.map(o -> (JsonObject) o) | ||
.map(o -> new StreamInfoImpl( | ||
o.getString("name"), | ||
o.getString("topic"), | ||
o.getString("format"))) | ||
.collect(Collectors.toList()) | ||
); | ||
} catch (Exception e) { | ||
cf.completeExceptionally(new IllegalStateException( | ||
"Unexpected server response format. Response: " + streamsListEntity)); | ||
} | ||
} | ||
|
||
static void handleListTablesResponse( | ||
final JsonObject tablesListEntity, | ||
final CompletableFuture<List<TableInfo>> cf | ||
) { | ||
try { | ||
final JsonArray tables = tablesListEntity.getJsonArray("tables"); | ||
cf.complete(tables.stream() | ||
.map(o -> (JsonObject) o) | ||
.map(o -> new TableInfoImpl( | ||
o.getString("name"), | ||
o.getString("topic"), | ||
o.getString("format"), | ||
o.getBoolean("isWindowed"))) | ||
.collect(Collectors.toList()) | ||
); | ||
} catch (Exception e) { | ||
cf.completeExceptionally(new IllegalStateException( | ||
"Unexpected server response format. Response: " + tablesListEntity)); | ||
} | ||
} | ||
|
||
static void handleListTopicsResponse( | ||
final JsonObject kafkaTopicsListEntity, | ||
final CompletableFuture<List<TopicInfo>> cf | ||
) { | ||
try { | ||
final JsonArray topics = kafkaTopicsListEntity.getJsonArray("topics"); | ||
cf.complete(topics.stream() | ||
.map(o -> (JsonObject) o) | ||
.map(o -> { | ||
final List<Integer> replicaInfo = o.getJsonArray("replicaInfo").stream() | ||
.map(v -> (Integer)v) | ||
.collect(Collectors.toList()); | ||
return new TopicInfoImpl( | ||
o.getString("name"), | ||
replicaInfo.size(), | ||
replicaInfo); | ||
}) | ||
.collect(Collectors.toList()) | ||
); | ||
} catch (Exception e) { | ||
cf.completeExceptionally(new IllegalStateException( | ||
"Unexpected server response format. Response: " + kafkaTopicsListEntity)); | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.