From 942bc87e34cbcd17201646790d75625740ce83ad Mon Sep 17 00:00:00 2001 From: Aitozi Date: Thu, 11 Nov 2021 20:56:12 +0800 Subject: [PATCH] [FLINK-24455][tests]FallbackAkkaRpcSystemLoader checks maven exit code --- .../runtime/rpc/akka/AkkaRpcSystemLoader.java | 19 +++++++--- .../rpc/akka/FallbackAkkaRpcSystemLoader.java | 26 +++++++++---- .../apache/flink/runtime/rpc/RpcSystem.java | 3 +- .../rpc/exceptions/RpcLoaderException.java | 37 +++++++++++++++++++ 4 files changed, 72 insertions(+), 13 deletions(-) create mode 100644 flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcLoaderException.java diff --git a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java index ed8977ab8edd6..216769e9c6d89 100644 --- a/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java +++ b/flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java @@ -22,6 +22,7 @@ import org.apache.flink.core.classloading.SubmoduleClassLoader; import org.apache.flink.runtime.rpc.RpcSystem; import org.apache.flink.runtime.rpc.RpcSystemLoader; +import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException; import org.apache.flink.util.IOUtils; import java.io.IOException; @@ -42,6 +43,12 @@ */ public class AkkaRpcSystemLoader implements RpcSystemLoader { + /** The name of the akka dependency jar, bundled with flink-rpc-akka-loader module artifact. */ + private static final String FLINK_RPC_AKKA_FAT_JAR = "flink-rpc-akka.jar"; + + static final String HINT_USAGE = + "mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader"; + @Override public RpcSystem loadRpcSystem(Configuration config) { try { @@ -54,12 +61,14 @@ public RpcSystem loadRpcSystem(Configuration config) { tmpDirectory.resolve("flink-rpc-akka_" + UUID.randomUUID() + ".jar")); final InputStream resourceStream = - flinkClassLoader.getResourceAsStream("flink-rpc-akka.jar"); + flinkClassLoader.getResourceAsStream(FLINK_RPC_AKKA_FAT_JAR); if (resourceStream == null) { - throw new RuntimeException( - "Akka RPC system could not be found. If this happened while running a test in the IDE," - + "run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line," - + "or add a test dependency on the flink-rpc-akka-loader test-jar."); + throw new RpcLoaderException( + String.format( + "Akka RPC system could not be found. If this happened while running a test in the IDE, " + + "run '%s' on the command-line, " + + "or add a test dependency on the flink-rpc-akka-loader test-jar.", + HINT_USAGE)); } IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile)); diff --git a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java index a8624bb3b965c..1b8fc8890eb35 100644 --- a/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java +++ b/flink-rpc/flink-rpc-akka-loader/src/test/java/org/apache/flink/runtime/rpc/akka/FallbackAkkaRpcSystemLoader.java @@ -21,6 +21,7 @@ import org.apache.flink.core.classloading.SubmoduleClassLoader; import org.apache.flink.runtime.rpc.RpcSystem; import org.apache.flink.runtime.rpc.RpcSystemLoader; +import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException; import org.apache.flink.util.OperatingSystem; import org.slf4j.Logger; @@ -46,6 +47,9 @@ public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader { private static final Logger LOG = LoggerFactory.getLogger(FallbackAkkaRpcSystemLoader.class); + private static final String MODULE_FLINK_RPC = "flink-rpc"; + private static final String MODULE_FLINK_RPC_AKKA = "flink-rpc-akka"; + @Override public RpcSystem loadRpcSystem(Configuration config) { try { @@ -67,7 +71,13 @@ public RpcSystem loadRpcSystem(Configuration config) { akkaRpcModuleDirectory.resolve(Paths.get("target", "dependencies")); if (!Files.exists(akkaRpcModuleDependenciesDirectory)) { - downloadDependencies(akkaRpcModuleDirectory, akkaRpcModuleDependenciesDirectory); + int exitCode = + downloadDependencies( + akkaRpcModuleDirectory, akkaRpcModuleDependenciesDirectory); + if (exitCode != 0) { + throw new RpcLoaderException( + "Could not download dependencies of flink-rpc-akka, please see the log output for details."); + } } else { LOG.debug( "Re-using previously downloaded flink-rpc-akka dependencies. If you are experiencing strange issues, try clearing '{}'.", @@ -95,8 +105,10 @@ public RpcSystem loadRpcSystem(Configuration config) { submoduleClassLoader, null); } catch (Exception e) { - throw new RuntimeException( - "Could not initialize RPC system. Run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line.", + throw new RpcLoaderException( + String.format( + "Could not initialize RPC system. Run '%s' on the command-line instead.", + AkkaRpcSystemLoader.HINT_USAGE), e); } } @@ -109,18 +121,18 @@ private static Path findAkkaRpcModuleDirectory(Path currentParentCandidate) thro try (Stream directoryContents = Files.list(currentParentCandidate)) { final Optional flinkRpcModuleDirectory = directoryContents - .filter(path -> path.getFileName().toString().equals("flink-rpc")) + .filter(path -> path.getFileName().toString().equals(MODULE_FLINK_RPC)) .findFirst(); if (flinkRpcModuleDirectory.isPresent()) { return flinkRpcModuleDirectory - .map(path -> path.resolve(Paths.get("flink-rpc-akka"))) + .map(path -> path.resolve(Paths.get(MODULE_FLINK_RPC_AKKA))) .get(); } } return findAkkaRpcModuleDirectory(currentParentCandidate.getParent()); } - private static void downloadDependencies(Path workingDirectory, Path targetDirectory) + private static int downloadDependencies(Path workingDirectory, Path targetDirectory) throws IOException, InterruptedException { final String mvnExecutable = OperatingSystem.isWindows() ? "mvn.bat" : "mvn"; @@ -134,6 +146,6 @@ private static void downloadDependencies(Path workingDirectory, Path targetDirec "-DincludeScope=runtime", // excludes provided/test dependencies "-DoutputDirectory=" + targetDirectory) .redirectOutput(ProcessBuilder.Redirect.INHERIT); - mvn.start().waitFor(); + return mvn.start().waitFor(); } } diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java index e6eab2d4107da..cced3ca8f54e4 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException; import org.apache.flink.util.ExceptionUtils; import javax.annotation.Nullable; @@ -102,7 +103,7 @@ static RpcSystem load(Configuration config) { loadError = ExceptionUtils.firstOrSuppressed(e, loadError); } } - throw new RuntimeException("Could not load RpcSystem.", loadError); + throw new RpcLoaderException("Could not load RpcSystem.", loadError); } /** Descriptor for creating a fork-join thread-pool. */ diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcLoaderException.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcLoaderException.java new file mode 100644 index 0000000000000..70449c5c1c030 --- /dev/null +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcLoaderException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.exceptions; + +/** Base class for RPC loader exceptions. */ +public class RpcLoaderException extends RuntimeException { + + private static final long serialVersionUID = 7787884485642531050L; + + public RpcLoaderException(String message) { + super(message); + } + + public RpcLoaderException(Throwable cause) { + super(cause); + } + + public RpcLoaderException(String message, Throwable cause) { + super(message, cause); + } +}