-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: classify authorization exception as user error #7061
Changes from 1 commit
bf8f3c6
3bf546f
b27225a
4adaf97
4c801e0
2cb2e1a
08b1f72
d0fdb9b
a065ecf
1bfee24
7acea03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Add integration test for error classification
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Copyright 2021 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"; you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.query; | ||
|
||
import io.confluent.ksql.query.QueryError.Type; | ||
import java.util.Objects; | ||
import org.apache.kafka.common.errors.GroupAuthorizationException; | ||
import org.apache.kafka.streams.errors.StreamsException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* {@code GroupAuthorizationClassifier} classifies missing consumer group ACLs as user error | ||
*/ | ||
public class GroupAuthorizationClassifier implements QueryErrorClassifier { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(GroupAuthorizationClassifier.class); | ||
|
||
private final String queryId; | ||
|
||
public GroupAuthorizationClassifier(final String queryId) { | ||
this.queryId = Objects.requireNonNull(queryId, "queryId"); | ||
} | ||
|
||
@Override | ||
public Type classify(final Throwable e) { | ||
final Type type = e instanceof GroupAuthorizationException ? Type.USER : Type.UNKNOWN; | ||
|
||
if (type == Type.USER) { | ||
LOG.info( | ||
"Classified error as USER error based on missing consumer groups access rights. Query ID: {} Exception: {}", | ||
queryId, | ||
e); | ||
} | ||
|
||
return type; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,8 @@ | |
import static org.apache.kafka.common.resource.ResourceType.CLUSTER; | ||
import static org.apache.kafka.common.resource.ResourceType.GROUP; | ||
import static org.apache.kafka.common.resource.ResourceType.TOPIC; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.contains; | ||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
|
@@ -44,6 +46,8 @@ | |
import io.confluent.ksql.engine.KsqlEngineTestUtil; | ||
import io.confluent.ksql.function.InternalFunctionRegistry; | ||
import io.confluent.ksql.logging.processing.ProcessingLogContext; | ||
import io.confluent.ksql.query.QueryError; | ||
import io.confluent.ksql.query.QueryError.Type; | ||
import io.confluent.ksql.query.QueryId; | ||
import io.confluent.ksql.query.id.SequentialQueryIdGenerator; | ||
import io.confluent.ksql.services.DisabledKsqlClient; | ||
|
@@ -60,6 +64,8 @@ | |
import io.confluent.ksql.util.OrderDataProvider; | ||
import io.confluent.ksql.util.PersistentQueryMetadata; | ||
import io.confluent.ksql.util.QueryMetadata; | ||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
@@ -74,8 +80,11 @@ | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.common.acl.AclOperation; | ||
import org.apache.kafka.common.acl.AclPermissionType; | ||
import org.apache.kafka.common.errors.GroupAuthorizationException; | ||
import org.apache.kafka.common.errors.TopicAuthorizationException; | ||
import org.apache.kafka.common.resource.ResourcePattern; | ||
import org.apache.kafka.common.security.auth.SecurityProtocol; | ||
import org.apache.kafka.streams.errors.MissingSourceTopicException; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.junit.ClassRule; | ||
|
@@ -243,6 +252,148 @@ public void shouldWorkWithMinimalPrefixedAcls() { | |
assertCanAccessClusterConfig(prefix); | ||
} | ||
|
||
@Test | ||
public void shouldClassifyMissingSourceTopicExceptionAsUserError() { | ||
// Given: | ||
final String serviceId = "my-service-id_"; // Defaults to "default_" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These new tests are hard to read since it's not clear to me which parts of the "givens" are relevant for the test and which are just boilerplate. Can we refactor the common parts/boilerplate into a helper method (e.g., |
||
final String prefix = "_confluent-ksql-" + serviceId; | ||
|
||
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER); | ||
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(CLUSTER, "kafka-cluster"), | ||
ops(DESCRIBE_CONFIGS)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(TOPIC, INPUT_TOPIC), | ||
ops(READ)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(TOPIC, outputTopic), | ||
ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
prefixedResource(TOPIC, prefix), | ||
ops(ALL)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
prefixedResource(GROUP, prefix), | ||
ops(ALL)); | ||
|
||
givenTestSetupWithConfig(ksqlConfig); | ||
|
||
topicClient.deleteTopics(Collections.singleton(INPUT_TOPIC)); | ||
|
||
// Then: | ||
assertQueryFailsWithUserError( | ||
String.format( | ||
"CREATE STREAM %s WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM %s;", | ||
outputTopic, | ||
INPUT_STREAM | ||
), | ||
String.format( | ||
"%s: One or more source topics were missing during rebalance", | ||
MissingSourceTopicException.class.getName() | ||
) | ||
); | ||
} | ||
|
||
@Test | ||
public void shouldClassifyTopicAuthorizationExceptionAsUserError() { | ||
// Given: | ||
final String serviceId = "my-service-id_"; // Defaults to "default_" | ||
final String prefix = "_confluent-ksql-" + serviceId; | ||
|
||
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER); | ||
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(CLUSTER, "kafka-cluster"), | ||
ops(DESCRIBE_CONFIGS)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(TOPIC, INPUT_TOPIC), | ||
ops(READ)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
prefixedResource(TOPIC, prefix), | ||
ops(ALL)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(TOPIC, outputTopic), | ||
ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
prefixedResource(GROUP, prefix), | ||
ops(ALL)); | ||
|
||
givenTestSetupWithConfig(ksqlConfig); | ||
|
||
TEST_HARNESS.getKafkaCluster().addUserAcl( | ||
NORMAL_USER.username, | ||
AclPermissionType.DENY, | ||
resource(TOPIC, INPUT_TOPIC), | ||
ops(READ) | ||
); | ||
|
||
// Then: | ||
assertQueryFailsWithUserError( | ||
String.format( | ||
"CREATE STREAM %s WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM %s;", | ||
outputTopic, | ||
INPUT_STREAM | ||
), | ||
String.format( | ||
"%s: Not authorized to access topics: [%s]", | ||
TopicAuthorizationException.class.getName(), | ||
INPUT_TOPIC | ||
) | ||
); | ||
} | ||
|
||
@Test | ||
public void shouldClassifyGroupAuthorizationExceptionAsUserError() { | ||
// Given: | ||
final String serviceId = "my-service-id_"; // Defaults to "default_" | ||
final String prefix = "_confluent-ksql-" + serviceId; | ||
|
||
final Map<String, Object> ksqlConfig = getKsqlConfig(NORMAL_USER); | ||
ksqlConfig.put(KSQL_SERVICE_ID_CONFIG, serviceId); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(CLUSTER, "kafka-cluster"), | ||
ops(DESCRIBE_CONFIGS)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(TOPIC, INPUT_TOPIC), | ||
ops(READ)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
resource(TOPIC, outputTopic), | ||
ops(CREATE /* as the topic doesn't exist yet*/, WRITE)); | ||
|
||
givenAllowAcl(NORMAL_USER, | ||
prefixedResource(TOPIC, prefix), | ||
ops(ALL)); | ||
|
||
givenTestSetupWithConfig(ksqlConfig); | ||
|
||
// Then: | ||
assertQueryFailsWithUserError( | ||
String.format( | ||
"CREATE STREAM %s WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM %s;", | ||
outputTopic, | ||
INPUT_STREAM | ||
), | ||
String.format( | ||
"%s: Not authorized to access group: %squery_", | ||
GroupAuthorizationException.class.getName(), | ||
prefix | ||
) + "%s" | ||
); | ||
} | ||
|
||
// Requires correctly configured schema-registry running | ||
//@Test | ||
@SuppressWarnings("unused") | ||
|
@@ -321,6 +472,29 @@ private void assertCanRunKsqlQuery( | |
TEST_HARNESS.verifyAvailableRecords(outputTopic, greaterThan(0)); | ||
} | ||
|
||
private void assertQueryFailsWithUserError( | ||
final String query, | ||
final String errorMsg | ||
) { | ||
final QueryMetadata queryMetadata = KsqlEngineTestUtil | ||
.execute(serviceContext, ksqlEngine, query, ksqlConfig, Collections.emptyMap()).get(0); | ||
|
||
queryMetadata.start(); | ||
assertThatEventually( | ||
"", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: empty error message. |
||
() -> queryMetadata.getQueryErrors().size() > 0, | ||
is(true) | ||
); | ||
|
||
for (final QueryError error : queryMetadata.getQueryErrors()) { | ||
assertThat(error.getType(), is(Type.USER)); | ||
assertThat( | ||
error.getErrorMessage().split("\n")[0], | ||
is(String.format(errorMsg, queryMetadata.getQueryId())) | ||
); | ||
} | ||
} | ||
|
||
private static Map<String, Object> getBaseKsqlConfig() { | ||
final Map<String, Object> configs = new HashMap<>(KsqlConfigTestUtil.baseTestConfig()); | ||
configs.put( | ||
|
@@ -382,6 +556,6 @@ private void executePersistentQuery(final String queryString, | |
.execute(serviceContext, ksqlEngine, query, ksqlConfig, Collections.emptyMap()).get(0); | ||
|
||
queryMetadata.start(); | ||
queryId = ((PersistentQueryMetadata) queryMetadata).getQueryId(); | ||
queryId = queryMetadata.getQueryId(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright 2021 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"; you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.query; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.is; | ||
|
||
import io.confluent.ksql.query.QueryError.Type; | ||
import org.apache.kafka.common.errors.GroupAuthorizationException; | ||
import org.apache.kafka.streams.errors.StreamsException; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.mockito.junit.MockitoJUnitRunner; | ||
|
||
@RunWith(MockitoJUnitRunner.class) | ||
public class GroupAuthorizationClassifierTest { | ||
|
||
@Test | ||
public void shouldClassifyTopicAuthorizationExceptionAsUserError() { | ||
mjsax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Given: | ||
final Exception e = new StreamsException(new GroupAuthorizationException("foo")); | ||
|
||
// When: | ||
final Type type = new GroupAuthorizationClassifier("").classify(e); | ||
|
||
// Then: | ||
assertThat(type, is(Type.USER)); | ||
} | ||
|
||
@Test | ||
public void shouldClassifyNoGroupAuthorizationExceptionAsUnknownError() { | ||
// Given: | ||
final Exception e = new Exception("foo"); | ||
|
||
// When: | ||
final Type type = new GroupAuthorizationClassifier("").classify(e); | ||
|
||
// Then: | ||
assertThat(type, is(Type.UNKNOWN)); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR but I'm curious: what happens if an internal (changelog or repartition) topic is missing, or if a sink topic is missing? Does Streams throw a different type of exception in these cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Atm, for changelogs and repartition topic they would be created. We would fail if the expected config does not match. (this will "change" with https://cwiki.apache.org/confluence/display/KAFKA/KIP-698%3A+Add+Explicit+User+Initialization+of+Broker-side+State+to+Kafka+Streams) -- Or course, it's only verified in a rebalance, but checking the source topic is also only done during a rebalance.
For output topics, Kafka Streams won't do anything, and thus the producer would fill up its write buffer and eventually block. To eventually Streams would crash.
But as ksqlDB checks for output topic, it should be ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is true for internal topics too right? It seems like right now for internal topics that are deleted we'd block for max.block.ms and then throw an error classified as SYSTEM. Then, retry and recreate. After KIP-698 we'd block and then throw an error we will classify as USER on every retry.
Where do we check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Not sure how the error would be classified atm or after KIP-698. But it might be out-of-scope for this PR. Would like to focus on authorization errors for now.
Doesn't kslqDB create output topics explicitly if they don't exist?