Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Makes response codes rate limited as well as prints a message when it is hit #6701

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,13 @@ property has the value `KSQL_PROCESSING_LOG`.
Toggles whether or not the processing log should include rows in log
messages. By default, this property has the value `false`.

### ksql.logging.server.skipped.response.codes
### ksql.logging.server.rate.limited.response.codes

A comma-separated list of HTTP response codes to skip during server
request logging. This is useful for ignoring certain 4XX errors that you
might not want to show up in the logs.
A list of `path:rate_limit` pairs, to limit the rate of server request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify the units of the rate limit (and also for the other config below). Appears to be number of messages per second?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I clarified it's qps and even explicitly write out 10 logs per second as an example.

logging. This is useful for limiting certain 4XX errors that you
AlanConfluent marked this conversation as resolved.
Show resolved Hide resolved
might not want to blow up in the logs.
This setting enables seeing the logs when the request rate is low
and dropping them when they go over the threshold.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that a message will be logged (at most once every five seconds) if the threshold is hit? (And same for the other config below.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


### ksql.logging.server.rate.limited.request.paths
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're currently logging for internal endpoints (used in server-to-server communication for multi-node clusters), right? Would it make sense to disable logging for those by default? (I don't feel strongly one way or the other -- you have more context than I do about when/how often those endpoints are hit.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently we log all requests, regardless of internal/external status. I think this could be useful if we're trying to trace pull queries falling back on standbys after failures. We can see if this ends up being useful and potentially remove internal if not.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,29 @@ public static Validator intList() {
};
}

public static Validator mapWithIntKeyDoubleValue() {
return (name, val) -> {
if (!(val instanceof String)) {
throw new ConfigException(name, val, "Must be a string");
}

final String str = (String) val;
final Map<String, String> map = KsqlConfig.parseStringAsMap(name, str);
map.forEach((keyStr, valueStr) -> {
try {
Integer.parseInt(keyStr);
} catch (NumberFormatException e) {
throw new ConfigException(name, keyStr, "Not an int");
}
try {
Double.parseDouble(valueStr);
} catch (NumberFormatException e) {
throw new ConfigException(name, valueStr, "Not a double");
}
});
};
}

public static Validator mapWithDoubleValue() {
return (name, val) -> {
if (!(val instanceof String)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,58 @@ public void shouldThrowOnNoStringRegexList() {
assertThat(e.getMessage(), containsString("validator should only be used with LIST of STRING defs"));
}

@Test
public void shouldParseDoubleValueInMap() {
// Given:
final Validator validator = ConfigValidators.mapWithDoubleValue();
validator.ensureValid("propName", "foo:1.2,bar:3");
}

@Test
public void shouldParseIntKeyDoubleValueInMap() {
// Given:
final Validator validator = ConfigValidators.mapWithIntKeyDoubleValue();
validator.ensureValid("propName", "123:1.2,345:9.0");
}

@Test
public void shouldThrowOnBadDoubleValueInMap() {
// Given:
final Validator validator = ConfigValidators.mapWithDoubleValue();

// When:
final Exception e = assertThrows(
ConfigException.class,
() -> validator.ensureValid("propName", "foo:abc")
);

// Then:
assertThat(e.getMessage(),
containsString("Invalid value abc for configuration propName: Not a double"));
}

@Test
public void shouldThrowOnBadIntDoubleValueInMap() {
// Given:
final Validator validator = ConfigValidators.mapWithIntKeyDoubleValue();

// When:
final Exception e = assertThrows(
ConfigException.class,
() -> validator.ensureValid("propName", "1:abc")
);
final Exception e2 = assertThrows(
ConfigException.class,
() -> validator.ensureValid("propName", "abc:1.2")
);

// Then:
assertThat(e.getMessage(),
containsString("Invalid value abc for configuration propName: Not a double"));
assertThat(e2.getMessage(),
containsString("Invalid value abc for configuration propName: Not an int"));
}

private enum TestEnum {
FOO, BAR
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,17 @@

package io.confluent.ksql.api.server;

import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
import io.confluent.ksql.api.auth.ApiUser;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.impl.Utils;
import java.time.Clock;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,13 +34,10 @@ public class LoggingHandler implements Handler<RoutingContext> {
private static final Logger LOG = LoggerFactory.getLogger(LoggingHandler.class);
static final String HTTP_HEADER_USER_AGENT = "User-Agent";

private final Set<Integer> skipResponseCodes;
private final Logger logger;
private final Clock clock;
private final LoggingRateLimiter loggingRateLimiter;

private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();

public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateLimiter) {
this(server, loggingRateLimiter, LOG, Clock.systemUTC());
}
Expand All @@ -60,7 +50,6 @@ public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateL
final Clock clock) {
requireNonNull(server);
this.loggingRateLimiter = requireNonNull(loggingRateLimiter);
this.skipResponseCodes = getSkipResponseCodes(server.getConfig());
this.logger = logger;
this.clock = clock;
}
Expand All @@ -69,17 +58,14 @@ public LoggingHandler(final Server server, final LoggingRateLimiter loggingRateL
public void handle(final RoutingContext routingContext) {
routingContext.addEndHandler(ar -> {
// After the response is complete, log results here.
if (skipResponseCodes.contains(routingContext.response().getStatusCode())) {
return;
}
if (!loggingRateLimiter.shouldLog(routingContext.request().path())) {
final int status = routingContext.request().response().getStatusCode();
if (!loggingRateLimiter.shouldLog(logger, routingContext.request().path(), status)) {
return;
}
final long contentLength = routingContext.request().response().bytesWritten();
final HttpVersion version = routingContext.request().version();
final HttpMethod method = routingContext.request().method();
final String uri = routingContext.request().uri();
final int status = routingContext.request().response().getStatusCode();
final long requestBodyLength = routingContext.request().bytesRead();
final String versionFormatted;
switch (version) {
Expand Down Expand Up @@ -118,13 +104,6 @@ public void handle(final RoutingContext routingContext) {
routingContext.next();
}

private static Set<Integer> getSkipResponseCodes(final KsqlRestConfig config) {
// Already validated as all ints
return config.getList(KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG)
.stream()
.map(Integer::parseInt).collect(ImmutableSet.toImmutableSet());
}

private void doLog(final int status, final String message) {
if (status >= 500) {
logger.error(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.api.server;

import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG;
import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -26,13 +27,23 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.slf4j.Logger;

class LoggingRateLimiter {
// Print "You hit a rate limit" every 5 seconds
AlanConfluent marked this conversation as resolved.
Show resolved Hide resolved
private static final double LIMIT_HIT_LOG_RATE = 0.2;

private final Map<String, Double> rateLimitedPaths;
private final Map<Integer, Double> rateLimitedResponseCodes;

private final Function<Double, RateLimiter> rateLimiterFactory;

private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();
private final Map<String, RateLimiter> rateLimitersByPath = new ConcurrentHashMap<>();
private final Map<Integer, RateLimiter> rateLimitersByResponseCode = new ConcurrentHashMap<>();

// Rate limiters for printing the "You hit a rate limit" message
private final RateLimiter pathLimitHit;
private final RateLimiter responseCodeLimitHit;

LoggingRateLimiter(final KsqlRestConfig ksqlRestConfig) {
this(ksqlRestConfig, RateLimiter::create);
Expand All @@ -45,13 +56,33 @@ class LoggingRateLimiter {
requireNonNull(ksqlRestConfig);
this.rateLimiterFactory = requireNonNull(rateLimiterFactory);
this.rateLimitedPaths = getRateLimitedRequestPaths(ksqlRestConfig);
this.rateLimitedResponseCodes = getRateLimitedResponseCodes(ksqlRestConfig);
this.pathLimitHit = rateLimiterFactory.apply(LIMIT_HIT_LOG_RATE);
this.responseCodeLimitHit = rateLimiterFactory.apply(LIMIT_HIT_LOG_RATE);
}

public boolean shouldLog(final String path) {
public boolean shouldLog(final Logger logger, final String path, final int responseCode) {
if (rateLimitedPaths.containsKey(path)) {
final double rateLimit = rateLimitedPaths.get(path);
rateLimiters.computeIfAbsent(path, (k) -> rateLimiterFactory.apply(rateLimit));
return rateLimiters.get(path).tryAcquire();
rateLimitersByPath.computeIfAbsent(path, (k) -> rateLimiterFactory.apply(rateLimit));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to initialize the rate limiters up front, rather than calling computeIfAbsent on each request? I don't have a sense of how large this optimization is, feels minor but I also feel it can't hurt. Feel free to disagree.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right. The data is static and not large, so why not just make an immutable map instead on initialization?

I was originally thinking it might be dynamic, but it's not currently.

if (!rateLimitersByPath.get(path).tryAcquire()) {
if (pathLimitHit.tryAcquire()) {
logger.info("Hit rate limit for path " + path + " with limit " + rateLimit);
}
return false;
}
}
if (rateLimitedResponseCodes.containsKey(responseCode)) {
final double rateLimit = rateLimitedResponseCodes.get(responseCode);
rateLimitersByResponseCode.computeIfAbsent(
responseCode, (k) -> rateLimiterFactory.apply(rateLimit));
if (!rateLimitersByResponseCode.get(responseCode).tryAcquire()) {
if (responseCodeLimitHit.tryAcquire()) {
logger.info("Hit rate limit for response code " + responseCode + " with limit "
+ rateLimit);
}
return false;
}
}
return true;
}
Expand All @@ -64,4 +95,12 @@ private static Map<String, Double> getRateLimitedRequestPaths(final KsqlRestConf
entry -> Double.parseDouble(entry.getValue())));
}

private static Map<Integer, Double> getRateLimitedResponseCodes(final KsqlRestConfig config) {
// Already validated as all ints
AlanConfluent marked this conversation as resolved.
Show resolved Hide resolved
return config.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG)
.entrySet().stream()
.collect(ImmutableMap.toImmutableMap(
entry -> Integer.parseInt(entry.getKey()),
entry -> Double.parseDouble(entry.getValue())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

package io.confluent.ksql.rest.server;

import static io.confluent.ksql.configdef.ConfigValidators.intList;
import static io.confluent.ksql.configdef.ConfigValidators.mapWithDoubleValue;
import static io.confluent.ksql.configdef.ConfigValidators.mapWithIntKeyDoubleValue;
import static io.confluent.ksql.configdef.ConfigValidators.oneOrMore;
import static io.confluent.ksql.configdef.ConfigValidators.zeroOrPositive;

Expand Down Expand Up @@ -333,10 +333,10 @@ public class KsqlRestConfig extends AbstractConfig {
"The key store certificate alias to be used for internal client requests. If not set, "
+ "the system will fall back on the Vert.x default choice";

public static final String KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG =
KSQL_CONFIG_PREFIX + "logging.server.skipped.response.codes";
private static final String KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_DOC =
"A list of HTTP response codes to skip during server request logging";
public static final String KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG =
KSQL_CONFIG_PREFIX + "logging.server.rate.limited.response.codes";
private static final String KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_DOC =
"A list of code:rate_limit pairs, to rate limit the server request logging";

public static final String KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG =
KSQL_CONFIG_PREFIX + "logging.server.rate.limited.request.paths";
Expand Down Expand Up @@ -640,12 +640,12 @@ public class KsqlRestConfig extends AbstractConfig {
ConfigDef.Importance.LOW,
KSQL_AUTHENTICATION_PLUGIN_DOC
).define(
KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_CONFIG,
Type.LIST,
KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG,
Type.STRING,
"",
intList(),
mapWithIntKeyDoubleValue(),
ConfigDef.Importance.LOW,
KSQL_LOGGING_SERVER_SKIPPED_RESPONSE_CODES_DOC
KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_DOC
).define(
KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public void setUp() {
when(request.response()).thenReturn(response);
when(request.remoteAddress()).thenReturn(socketAddress);
when(ksqlRestConfig.getList(any())).thenReturn(ImmutableList.of("401"));
when(loggingRateLimiter.shouldLog("/query")).thenReturn(true);
when(loggingRateLimiter.shouldLog(logger, "/query", 200)).thenReturn(true);
when(loggingRateLimiter.shouldLog(logger, "/query", 405)).thenReturn(true);
when(clock.millis()).thenReturn(1699813434333L);
when(response.bytesWritten()).thenReturn(5678L);
when(request.path()).thenReturn("/query");
Expand Down Expand Up @@ -117,6 +118,7 @@ public void shouldProduceLog_warn() {
public void shouldSkipLog() {
// Given:
when(response.getStatusCode()).thenReturn(401);
when(loggingRateLimiter.shouldLog(logger, "/query", 401)).thenReturn(false);

// When:
loggingHandler.handle(routingContext);
Expand All @@ -133,7 +135,7 @@ public void shouldSkipLog() {
public void shouldSkipRateLimited() {
// Given:
when(response.getStatusCode()).thenReturn(200);
when(loggingRateLimiter.shouldLog("/query")).thenReturn(true, true, false, false);
when(loggingRateLimiter.shouldLog(logger, "/query", 200)).thenReturn(true, true, false, false);

// When:
loggingHandler.handle(routingContext);
Expand Down
Loading