Skip to content

Commit

Permalink
[FLINK-23713][tests] Do not hide original exceptions in SavepointITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed Aug 12, 2021
1 parent 1352705 commit 938cf8d
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 49 deletions.
33 changes: 33 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,17 @@ public static <T extends Throwable> Optional<T> findThrowable(
return Optional.empty();
}

/**
* The same as {@link #findThrowable(Throwable, Class)}, but rethrows original exception if the
* expected exception was not found.
*/
public static <T extends Throwable> void assertThrowable(
Throwable throwable, Class<T> searchType) throws T {
if (!findThrowable(throwable, searchType).isPresent()) {
throw (T) throwable;
}
}

/**
* Checks whether a throwable chain contains a specific type of exception and returns it. This
* method handles {@link SerializedThrowable}s in the chain and deserializes them with the given
Expand Down Expand Up @@ -541,6 +552,17 @@ public static Optional<Throwable> findThrowable(
return Optional.empty();
}

/**
* The same as {@link #findThrowable(Throwable, Predicate)}, but rethrows original exception if
* the expected exception was not found.
*/
public static <T extends Throwable> void assertThrowable(
T throwable, Predicate<Throwable> predicate) throws T {
if (!findThrowable(throwable, predicate).isPresent()) {
throw (T) throwable;
}
}

/**
* Checks whether a throwable chain contains a specific error message and returns the
* corresponding throwable.
Expand All @@ -567,6 +589,17 @@ public static Optional<Throwable> findThrowableWithMessage(
return Optional.empty();
}

/**
* The same as {@link #findThrowableWithMessage(Throwable, String)}, but rethrows original
* exception if the expected exception was not found.
*/
public static <T extends Throwable> void assertThrowableWithMessage(
Throwable throwable, String searchMessage) throws T {
if (!findThrowableWithMessage(throwable, searchMessage).isPresent()) {
throw (T) throwable;
}
}

/**
* Unpacks an {@link ExecutionException} and returns its cause. Otherwise the given Throwable is
* returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,10 @@
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.EntropyInjectingTestFileSystem;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;

import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
Expand Down Expand Up @@ -127,16 +125,16 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.concurrent.CompletableFuture.allOf;
import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
import static org.apache.flink.util.ExceptionUtils.assertThrowable;
import static org.apache.flink.util.ExceptionUtils.assertThrowableWithMessage;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -469,9 +467,8 @@ public void testTriggerSavepointForNonExistingJob() throws Exception {

fail();
} catch (ExecutionException e) {
assertTrue(
ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class).isPresent());
assertTrue(ExceptionUtils.findThrowableWithMessage(e, jobID.toString()).isPresent());
assertThrowable(e, FlinkJobNotFoundException.class);
assertThrowableWithMessage(e, jobID.toString());
} finally {
cluster.after();
}
Expand Down Expand Up @@ -510,13 +507,9 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {

fail();
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.findThrowable(e, IllegalStateException.class).isPresent());
assertTrue(
ExceptionUtils.findThrowableWithMessage(e, graph.getJobID().toString())
.isPresent());
assertTrue(
ExceptionUtils.findThrowableWithMessage(e, "is not a streaming job")
.isPresent());
assertThrowable(e, IllegalStateException.class);
assertThrowableWithMessage(e, graph.getJobID().toString());
assertThrowableWithMessage(e, "is not a streaming job");
} finally {
cluster.after();
}
Expand Down Expand Up @@ -578,12 +571,6 @@ static void resetForTest(int parallelism, boolean allowSnapshots) {
}
}

private static boolean ischeckpointcoordinatorshutdownError(Throwable throwable) {
return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
.filter(e -> e.getCheckpointFailureReason() == CHECKPOINT_COORDINATOR_SHUTDOWN)
.isPresent();
}

@Test
public void testStopSavepointWithBoundedInput() throws Exception {
final int numTaskManagers = 2;
Expand Down Expand Up @@ -671,9 +658,9 @@ public void testSubmitWithUnknownSavepointPath() throws Exception {
submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader());
} catch (Exception e) {
Optional<JobExecutionException> expectedJobExecutionException =
ExceptionUtils.findThrowable(e, JobExecutionException.class);
findThrowable(e, JobExecutionException.class);
Optional<FileNotFoundException> expectedFileNotFoundException =
ExceptionUtils.findThrowable(e, FileNotFoundException.class);
findThrowable(e, FileNotFoundException.class);
if (!(expectedJobExecutionException.isPresent()
&& expectedFileNotFoundException.isPresent())) {
throw e;
Expand Down Expand Up @@ -741,13 +728,15 @@ public void testStopWithSavepointWithDrainGlobalFailoverIfSavepointAborted() thr
.get();
fail("The future should fail exceptionally.");
} catch (ExecutionException e) {
assertThat(
e.getMessage(),
CoreMatchers.startsWith(
"org.apache.flink.util.FlinkException: Inconsistent execution state"
+ " after stopping with savepoint. At least one execution"
+ " is still in one of the following states: FAILED. "
+ "A global fail-over is triggered to recover the job"));
assertThrowable(
e,
ex ->
ex.getMessage()
.startsWith(
"org.apache.flink.util.FlinkException: Inconsistent execution state"
+ " after stopping with savepoint. At least one execution"
+ " is still in one of the following states: FAILED. "
+ "A global fail-over is triggered to recover the job"));
}
} finally {
cluster.after();
Expand Down Expand Up @@ -804,35 +793,37 @@ public HighAvailabilityServices createHAServices(
}
}

private static BiConsumer<JobID, ExecutionException> assertAfterSnapshotCreationFailure() {
private static BiFunction<JobID, ExecutionException, Boolean>
assertAfterSnapshotCreationFailure() {
return (jobId, actualException) -> {
if (ClusterOptions.isAdaptiveSchedulerEnabled(new Configuration())) {
assertThat(
actualException,
containsMessage("Stop with savepoint operation could not be completed"));
return actualException
.getMessage()
.contains("Stop with savepoint operation could not be completed");
} else {
Optional<FlinkException> actualFlinkException =
ExceptionUtils.findThrowable(actualException, FlinkException.class);
assertTrue(actualFlinkException.isPresent());

assertThat(
actualFlinkException.get(),
containsMessage(
findThrowable(actualException, FlinkException.class);
if (!actualFlinkException.isPresent()) {
return false;
}
return actualFlinkException
.get()
.getMessage()
.contains(
String.format(
"A global fail-over is triggered to recover the job %s.",
jobId)));
jobId));
}
};
}

private static BiConsumer<JobID, ExecutionException> assertInSnapshotCreationFailure() {
private static BiFunction<JobID, ExecutionException, Boolean>
assertInSnapshotCreationFailure() {
return (ignored, actualException) -> {
if (ClusterOptions.isAdaptiveSchedulerEnabled(new Configuration())) {
assertThat(actualException, containsCause(FlinkException.class));
return findThrowable(actualException, FlinkException.class).isPresent();
} else {
Optional<CheckpointException> actualFailureCause =
ExceptionUtils.findThrowable(actualException, CheckpointException.class);
assertTrue(actualFailureCause.isPresent());
return findThrowable(actualException, CheckpointException.class).isPresent();
}
};
}
Expand Down Expand Up @@ -862,7 +853,7 @@ private static void testStopWithFailingSourceInOnePipeline(
InfiniteTestSource failingSource,
File savepointDir,
int expectedMaximumNumberOfRestarts,
BiConsumer<JobID, ExecutionException> exceptionAssertion)
BiFunction<JobID, ExecutionException, Boolean> exceptionAssertion)
throws Exception {
MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
Expand Down Expand Up @@ -910,7 +901,7 @@ private static void testStopWithFailingSourceInOnePipeline(
.get();
fail("The future should fail exceptionally.");
} catch (ExecutionException e) {
exceptionAssertion.accept(jobGraph.getJobID(), e);
assertThrowable(e, ex -> exceptionAssertion.apply(jobGraph.getJobID(), e));
}

waitUntilAllTasksAreRunning(cluster.getRestAddres(), jobGraph.getJobID());
Expand Down

0 comments on commit 938cf8d

Please sign in to comment.