-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Makes response codes rate limited as well as prints a message when it is hit #6701
Changes from 3 commits
7f1edb1
8673fec
48282a4
6836df9
1c0ec7a
be94664
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -612,11 +612,13 @@ property has the value `KSQL_PROCESSING_LOG`. | |
Toggles whether or not the processing log should include rows in log | ||
messages. By default, this property has the value `false`. | ||
|
||
### ksql.logging.server.skipped.response.codes | ||
### ksql.logging.server.rate.limited.response.codes | ||
|
||
A comma-separated list of HTTP response codes to skip during server | ||
request logging. This is useful for ignoring certain 4XX errors that you | ||
might not want to show up in the logs. | ||
A list of `path:rate_limit` pairs, to limit the rate of server request | ||
logging. This is useful for limiting certain 4XX errors that you | ||
AlanConfluent marked this conversation as resolved.
Show resolved
Hide resolved
|
||
might not want to blow up in the logs. | ||
This setting enables seeing the logs when the request rate is low | ||
and dropping them when they go over the threshold. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mention that a message will be logged (at most once every five seconds) if the threshold is hit? (And same for the other config below.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
### ksql.logging.server.rate.limited.request.paths | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're currently logging for internal endpoints (used in server-to-server communication for multi-node clusters), right? Would it make sense to disable logging for those by default? (I don't feel strongly one way or the other -- you have more context than I do about when/how often those endpoints are hit.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, currently we log all requests, regardless of internal/external status. I think this could be useful if we're trying to trace pull queries falling back on standbys after failures. We can see if this ends up being useful and potentially remove internal if not. |
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
package io.confluent.ksql.api.server; | ||
|
||
import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_RATE_LIMITED_REQUEST_PATHS_CONFIG; | ||
import static io.confluent.ksql.rest.server.KsqlRestConfig.KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
|
@@ -26,13 +27,23 @@ | |
import java.util.Map.Entry; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.function.Function; | ||
import org.slf4j.Logger; | ||
|
||
class LoggingRateLimiter { | ||
// Print "You hit a rate limit" every 5 seconds | ||
AlanConfluent marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private static final double LIMIT_HIT_LOG_RATE = 0.2; | ||
|
||
private final Map<String, Double> rateLimitedPaths; | ||
private final Map<Integer, Double> rateLimitedResponseCodes; | ||
|
||
private final Function<Double, RateLimiter> rateLimiterFactory; | ||
|
||
private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>(); | ||
private final Map<String, RateLimiter> rateLimitersByPath = new ConcurrentHashMap<>(); | ||
private final Map<Integer, RateLimiter> rateLimitersByResponseCode = new ConcurrentHashMap<>(); | ||
|
||
// Rate limiters for printing the "You hit a rate limit" message | ||
private final RateLimiter pathLimitHit; | ||
private final RateLimiter responseCodeLimitHit; | ||
|
||
LoggingRateLimiter(final KsqlRestConfig ksqlRestConfig) { | ||
this(ksqlRestConfig, RateLimiter::create); | ||
|
@@ -45,13 +56,33 @@ class LoggingRateLimiter { | |
requireNonNull(ksqlRestConfig); | ||
this.rateLimiterFactory = requireNonNull(rateLimiterFactory); | ||
this.rateLimitedPaths = getRateLimitedRequestPaths(ksqlRestConfig); | ||
this.rateLimitedResponseCodes = getRateLimitedResponseCodes(ksqlRestConfig); | ||
this.pathLimitHit = rateLimiterFactory.apply(LIMIT_HIT_LOG_RATE); | ||
this.responseCodeLimitHit = rateLimiterFactory.apply(LIMIT_HIT_LOG_RATE); | ||
} | ||
|
||
public boolean shouldLog(final String path) { | ||
public boolean shouldLog(final Logger logger, final String path, final int responseCode) { | ||
if (rateLimitedPaths.containsKey(path)) { | ||
final double rateLimit = rateLimitedPaths.get(path); | ||
rateLimiters.computeIfAbsent(path, (k) -> rateLimiterFactory.apply(rateLimit)); | ||
return rateLimiters.get(path).tryAcquire(); | ||
rateLimitersByPath.computeIfAbsent(path, (k) -> rateLimiterFactory.apply(rateLimit)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be better to initialize the rate limiters up front, rather than calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, you're right. The data is static and not large, so why not just make an immutable map instead on initialization? I was originally thinking it might be dynamic, but it's not currently. |
||
if (!rateLimitersByPath.get(path).tryAcquire()) { | ||
if (pathLimitHit.tryAcquire()) { | ||
logger.info("Hit rate limit for path " + path + " with limit " + rateLimit); | ||
} | ||
return false; | ||
} | ||
} | ||
if (rateLimitedResponseCodes.containsKey(responseCode)) { | ||
final double rateLimit = rateLimitedResponseCodes.get(responseCode); | ||
rateLimitersByResponseCode.computeIfAbsent( | ||
responseCode, (k) -> rateLimiterFactory.apply(rateLimit)); | ||
if (!rateLimitersByResponseCode.get(responseCode).tryAcquire()) { | ||
if (responseCodeLimitHit.tryAcquire()) { | ||
logger.info("Hit rate limit for response code " + responseCode + " with limit " | ||
+ rateLimit); | ||
} | ||
return false; | ||
} | ||
} | ||
return true; | ||
} | ||
|
@@ -64,4 +95,12 @@ private static Map<String, Double> getRateLimitedRequestPaths(final KsqlRestConf | |
entry -> Double.parseDouble(entry.getValue()))); | ||
} | ||
|
||
private static Map<Integer, Double> getRateLimitedResponseCodes(final KsqlRestConfig config) { | ||
// Already validated as all ints | ||
AlanConfluent marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return config.getStringAsMap(KSQL_LOGGING_SERVER_RATE_LIMITED_RESPONSE_CODES_CONFIG) | ||
.entrySet().stream() | ||
.collect(ImmutableMap.toImmutableMap( | ||
entry -> Integer.parseInt(entry.getKey()), | ||
entry -> Double.parseDouble(entry.getValue()))); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify the units of the rate limit (and also for the other config below). Appears to be number of messages per second?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I clarified it's qps and even explicitly write out 10 logs per second as an example.