Skip to content

Commit

Permalink
Issue resilience4j#1228: Added onResult and onError hook to RateLimit…
Browse files Browse the repository at this point in the history
…er and drainPermissionsOnResult handler (resilience4j#1264)

Co-authored-by: Adam Walczak <adam.walczak@ewejsciowki.pl>
  • Loading branch information
walec51 and Adam Walczak committed Jan 11, 2021
1 parent 1a2ed10 commit 4387316
Show file tree
Hide file tree
Showing 7 changed files with 471 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
*
* Copyright 2016 Robert Winkler and Bohdan Storozhuk
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package io.github.resilience4j.core;

import io.vavr.control.Either;

import java.util.function.Function;

public class ResultUtils {

public static <T> boolean isSuccessfulAndReturned(
Either<? extends Throwable, ?> callsResult,
Class<T> expectedClass,
Function<T, Boolean> returnedChecker) {
if (callsResult.isLeft()) {
return false;
}
Object result = callsResult.get();
if (result == null) {
return false;
}
if (!expectedClass.isAssignableFrom(result.getClass())) {
return false;
}
return returnedChecker.apply((T) result);
}

public static <T extends Throwable> boolean isFailedAndThrown(
Either<? extends Throwable, ?> callsResult,
Class<T> expectedClass) {
return isFailedAndThrown(callsResult, expectedClass, thrown -> true);
}

public static <T extends Throwable> boolean isFailedAndThrown(
Either<? extends Throwable, ?> callsResult,
Class<T> expectedClass,
Function<T, Boolean> thrownChecker) {
if (callsResult.isRight()) {
return false;
}
Throwable thrown = callsResult.getLeft();
if (!expectedClass.isAssignableFrom(thrown.getClass())) {
return false;
}
return thrownChecker.apply((T) thrown);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@
import io.vavr.control.Try;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -141,13 +138,18 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(RateLimiter rate
.whenComplete(
(result, throwable) -> {
if (throwable != null) {
rateLimiter.onError(throwable);
promise.completeExceptionally(throwable);
} else {
rateLimiter.onResult(result);
promise.complete(result);
}
}
);
} catch (RequestNotPermitted requestNotPermitted) {
promise.completeExceptionally(requestNotPermitted);
} catch (Exception exception) {
rateLimiter.onError(exception);
promise.completeExceptionally(exception);
}
return promise;
Expand Down Expand Up @@ -185,7 +187,14 @@ static <T, F extends Future<T>> Supplier<F> decorateFuture(
) {
return () -> {
waitForPermission(rateLimiter, permits);
return supplier.get();
try {
F result = supplier.get();
rateLimiter.onResult(result);
return result;
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand Down Expand Up @@ -215,7 +224,14 @@ static <T> CheckedFunction0<T> decorateCheckedSupplier(RateLimiter rateLimiter,
CheckedFunction0<T> supplier) {
return () -> {
waitForPermission(rateLimiter, permits);
return supplier.apply();
try {
T result = supplier.apply();
rateLimiter.onResult(result);
return result;
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand Down Expand Up @@ -245,7 +261,13 @@ static CheckedRunnable decorateCheckedRunnable(RateLimiter rateLimiter, int perm

return () -> {
waitForPermission(rateLimiter, permits);
runnable.run();
try {
runnable.run();
rateLimiter.onSuccess();
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand Down Expand Up @@ -277,7 +299,14 @@ static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(RateLimiter rateLim
int permits, CheckedFunction1<T, R> function) {
return (T t) -> {
waitForPermission(rateLimiter, permits);
return function.apply(t);
try {
R result = function.apply(t);
rateLimiter.onResult(result);
return result;
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand All @@ -296,7 +325,14 @@ static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(RateLimiter rateLim
Function<T, Integer> permitsCalculator, CheckedFunction1<T, R> function) {
return (T t) -> {
waitForPermission(rateLimiter, permitsCalculator.apply(t));
return function.apply(t);
try {
R result = function.apply(t);
rateLimiter.onResult(result);
return result;
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand Down Expand Up @@ -325,7 +361,14 @@ static <T> Supplier<T> decorateSupplier(RateLimiter rateLimiter, int permits,
Supplier<T> supplier) {
return () -> {
waitForPermission(rateLimiter, permits);
return supplier.get();
try {
T result = supplier.get();
rateLimiter.onResult(result);
return result;
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand Down Expand Up @@ -356,7 +399,18 @@ static <T> Supplier<Try<T>> decorateTrySupplier(RateLimiter rateLimiter, int per
return () -> {
try {
waitForPermission(rateLimiter, permits);
return supplier.get();
try {
Try<T> result = supplier.get();
if (result.isSuccess()) {
rateLimiter.onResult(result.get());
} else {
rateLimiter.onError(result.getCause());
}
return result;
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
} catch (RequestNotPermitted requestNotPermitted) {
return Try.failure(requestNotPermitted);
}
Expand Down Expand Up @@ -390,7 +444,18 @@ static <T> Supplier<Either<Exception, T>> decorateEitherSupplier(RateLimiter rat
return () -> {
try {
waitForPermission(rateLimiter, permits);
return Either.narrow(supplier.get());
try {
Either<? extends Exception, T> result = supplier.get();
if (result.isRight()) {
rateLimiter.onResult(result.get());
} else {
rateLimiter.onError(result.getLeft());
}
return Either.narrow(result);
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
} catch (RequestNotPermitted requestNotPermitted) {
return Either.left(requestNotPermitted);
}
Expand Down Expand Up @@ -422,7 +487,14 @@ static <T> Callable<T> decorateCallable(RateLimiter rateLimiter, int permits,
Callable<T> callable) {
return () -> {
waitForPermission(rateLimiter, permits);
return callable.call();
try {
T result = callable.call();
rateLimiter.onResult(result);
return result;
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand Down Expand Up @@ -451,7 +523,13 @@ static <T> Consumer<T> decorateConsumer(RateLimiter rateLimiter, int permits,
Consumer<T> consumer) {
return (T t) -> {
waitForPermission(rateLimiter, permits);
consumer.accept(t);
try {
consumer.accept(t);
rateLimiter.onSuccess();
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand All @@ -469,7 +547,13 @@ static <T> Consumer<T> decorateConsumer(RateLimiter rateLimiter,
Function<T, Integer> permitsCalculator, Consumer<T> consumer) {
return (T t) -> {
waitForPermission(rateLimiter, permitsCalculator.apply(t));
consumer.accept(t);
try {
consumer.accept(t);
rateLimiter.onSuccess();
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand All @@ -495,7 +579,13 @@ static Runnable decorateRunnable(RateLimiter rateLimiter, Runnable runnable) {
static Runnable decorateRunnable(RateLimiter rateLimiter, int permits, Runnable runnable) {
return () -> {
waitForPermission(rateLimiter, permits);
runnable.run();
try {
runnable.run();
rateLimiter.onSuccess();
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand Down Expand Up @@ -527,7 +617,14 @@ static <T, R> Function<T, R> decorateFunction(RateLimiter rateLimiter, int permi
Function<T, R> function) {
return (T t) -> {
waitForPermission(rateLimiter, permits);
return function.apply(t);
try {
R result = function.apply(t);
rateLimiter.onResult(result);
return result;
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand All @@ -546,7 +643,14 @@ static <T, R> Function<T, R> decorateFunction(RateLimiter rateLimiter,
Function<T, Integer> permitsCalculator, Function<T, R> function) {
return (T t) -> {
waitForPermission(rateLimiter, permitsCalculator.apply(t));
return function.apply(t);
try {
R result = function.apply(t);
rateLimiter.onResult(result);
return result;
} catch (Exception exception) {
rateLimiter.onError(exception);
throw exception;
}
};
}

Expand Down Expand Up @@ -581,6 +685,24 @@ static void waitForPermission(final RateLimiter rateLimiter, int permits) {
}
}

/**
* Will drain permits remaining in cycle if calls result meet the criteria defined in
* {@link RateLimiterConfig#getDrainPermissionsOnResult()}.
*
* @param callsResult result of a methods call that was rate limiter by this rate limiter
*/
default void drainIfNeeded(Either<? extends Throwable, ?> callsResult) {
Function<Either<? extends Throwable, ?>, Boolean> checker = getRateLimiterConfig()
.getDrainPermissionsOnResult();
if (checker == null) {
return;
}
boolean drainNeeded = checker.apply(callsResult);
if (drainNeeded) {
drainPermissions();
}
}

/**
* Decorates and executes the decorated CompletionStage.
*
Expand Down Expand Up @@ -668,6 +790,33 @@ default long reservePermission() {
*/
void drainPermissions();

/**
* Records a failed call. This method must be invoked when a call failed.
*
* @param throwable The throwable which must be recorded
*/
default void onError(Throwable throwable) {
drainIfNeeded(Either.left(throwable));
}

/**
* Records a successful call. This method must be invoked when a call was
* successful.
*/
default void onSuccess() {
drainIfNeeded(Either.right(null));
}

/**
* This method must be invoked when a call returned a result
* and the result predicate should decide if the call was successful or not.
*
* @param result The result of the protected function
*/
default void onResult(Object result) {
drainIfNeeded(Either.right(result));
}

/**
* Get the name of this RateLimiter
*
Expand Down
Loading

0 comments on commit 4387316

Please sign in to comment.