Skip to content

Commit

Permalink
[FLINK-24455][tests]FallbackAkkaRpcSystemLoader checks maven exit code
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Nov 11, 2021
1 parent 24d2779 commit 942bc87
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.core.classloading.SubmoduleClassLoader;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcSystemLoader;
import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException;
import org.apache.flink.util.IOUtils;

import java.io.IOException;
Expand All @@ -42,6 +43,12 @@
*/
public class AkkaRpcSystemLoader implements RpcSystemLoader {

/** The name of the akka dependency jar, bundled with flink-rpc-akka-loader module artifact. */
private static final String FLINK_RPC_AKKA_FAT_JAR = "flink-rpc-akka.jar";

static final String HINT_USAGE =
"mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader";

@Override
public RpcSystem loadRpcSystem(Configuration config) {
try {
Expand All @@ -54,12 +61,14 @@ public RpcSystem loadRpcSystem(Configuration config) {
tmpDirectory.resolve("flink-rpc-akka_" + UUID.randomUUID() + ".jar"));

final InputStream resourceStream =
flinkClassLoader.getResourceAsStream("flink-rpc-akka.jar");
flinkClassLoader.getResourceAsStream(FLINK_RPC_AKKA_FAT_JAR);
if (resourceStream == null) {
throw new RuntimeException(
"Akka RPC system could not be found. If this happened while running a test in the IDE,"
+ "run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line,"
+ "or add a test dependency on the flink-rpc-akka-loader test-jar.");
throw new RpcLoaderException(
String.format(
"Akka RPC system could not be found. If this happened while running a test in the IDE, "
+ "run '%s' on the command-line, "
+ "or add a test dependency on the flink-rpc-akka-loader test-jar.",
HINT_USAGE));
}

IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.core.classloading.SubmoduleClassLoader;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcSystemLoader;
import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException;
import org.apache.flink.util.OperatingSystem;

import org.slf4j.Logger;
Expand All @@ -46,6 +47,9 @@
public class FallbackAkkaRpcSystemLoader implements RpcSystemLoader {
private static final Logger LOG = LoggerFactory.getLogger(FallbackAkkaRpcSystemLoader.class);

private static final String MODULE_FLINK_RPC = "flink-rpc";
private static final String MODULE_FLINK_RPC_AKKA = "flink-rpc-akka";

@Override
public RpcSystem loadRpcSystem(Configuration config) {
try {
Expand All @@ -67,7 +71,13 @@ public RpcSystem loadRpcSystem(Configuration config) {
akkaRpcModuleDirectory.resolve(Paths.get("target", "dependencies"));

if (!Files.exists(akkaRpcModuleDependenciesDirectory)) {
downloadDependencies(akkaRpcModuleDirectory, akkaRpcModuleDependenciesDirectory);
int exitCode =
downloadDependencies(
akkaRpcModuleDirectory, akkaRpcModuleDependenciesDirectory);
if (exitCode != 0) {
throw new RpcLoaderException(
"Could not download dependencies of flink-rpc-akka, please see the log output for details.");
}
} else {
LOG.debug(
"Re-using previously downloaded flink-rpc-akka dependencies. If you are experiencing strange issues, try clearing '{}'.",
Expand Down Expand Up @@ -95,8 +105,10 @@ public RpcSystem loadRpcSystem(Configuration config) {
submoduleClassLoader,
null);
} catch (Exception e) {
throw new RuntimeException(
"Could not initialize RPC system. Run 'mvn package -pl flink-rpc/flink-rpc-akka,flink-rpc/flink-rpc-akka-loader' on the command-line.",
throw new RpcLoaderException(
String.format(
"Could not initialize RPC system. Run '%s' on the command-line instead.",
AkkaRpcSystemLoader.HINT_USAGE),
e);
}
}
Expand All @@ -109,18 +121,18 @@ private static Path findAkkaRpcModuleDirectory(Path currentParentCandidate) thro
try (Stream<Path> directoryContents = Files.list(currentParentCandidate)) {
final Optional<Path> flinkRpcModuleDirectory =
directoryContents
.filter(path -> path.getFileName().toString().equals("flink-rpc"))
.filter(path -> path.getFileName().toString().equals(MODULE_FLINK_RPC))
.findFirst();
if (flinkRpcModuleDirectory.isPresent()) {
return flinkRpcModuleDirectory
.map(path -> path.resolve(Paths.get("flink-rpc-akka")))
.map(path -> path.resolve(Paths.get(MODULE_FLINK_RPC_AKKA)))
.get();
}
}
return findAkkaRpcModuleDirectory(currentParentCandidate.getParent());
}

private static void downloadDependencies(Path workingDirectory, Path targetDirectory)
private static int downloadDependencies(Path workingDirectory, Path targetDirectory)
throws IOException, InterruptedException {

final String mvnExecutable = OperatingSystem.isWindows() ? "mvn.bat" : "mvn";
Expand All @@ -134,6 +146,6 @@ private static void downloadDependencies(Path workingDirectory, Path targetDirec
"-DincludeScope=runtime", // excludes provided/test dependencies
"-DoutputDirectory=" + targetDirectory)
.redirectOutput(ProcessBuilder.Redirect.INHERIT);
mvn.start().waitFor();
return mvn.start().waitFor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.rpc;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.exceptions.RpcLoaderException;
import org.apache.flink.util.ExceptionUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -102,7 +103,7 @@ static RpcSystem load(Configuration config) {
loadError = ExceptionUtils.firstOrSuppressed(e, loadError);
}
}
throw new RuntimeException("Could not load RpcSystem.", loadError);
throw new RpcLoaderException("Could not load RpcSystem.", loadError);
}

/** Descriptor for creating a fork-join thread-pool. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rpc.exceptions;

/** Base class for RPC loader exceptions. */
public class RpcLoaderException extends RuntimeException {

private static final long serialVersionUID = 7787884485642531050L;

public RpcLoaderException(String message) {
super(message);
}

public RpcLoaderException(Throwable cause) {
super(cause);
}

public RpcLoaderException(String message, Throwable cause) {
super(message, cause);
}
}

0 comments on commit 942bc87

Please sign in to comment.