Skip to content

Commit

Permalink
[FLINK-23504][tests] Isolate TestingRpcService from Akka
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jul 27, 2021
1 parent d387c30 commit abf9752
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 32 deletions.
11 changes: 11 additions & 0 deletions flink-rpc/flink-rpc-akka-loader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<id>forbid-direct-akka-rpc-dependencies</id>
<!-- This module needs the dependency in order to bundle flink-rpc-akka. -->
<phase>none</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.rpc.akka;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
Expand All @@ -28,7 +29,6 @@
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
Expand Down Expand Up @@ -87,7 +87,10 @@ public class AkkaRpcActorTest extends TestLogger {

@BeforeClass
public static void setup() {
akkaRpcService = new TestingRpcService();
akkaRpcService =
new AkkaRpcService(
AkkaUtils.createLocalActorSystem(new Configuration()),
AkkaRpcServiceConfiguration.defaultConfiguration());
}

@AfterClass
Expand Down
15 changes: 0 additions & 15 deletions flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,21 +172,6 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-rpc-akka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-rpc-akka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-netty-tcnative-dynamic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
package org.apache.flink.runtime.rpc;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.akka.AkkaUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import java.io.Serializable;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -51,7 +52,11 @@
* verify(testGateway, timeout(1000)).theTestMethod(any(UUID.class), anyString());
* }</pre>
*/
public class TestingRpcService extends AkkaRpcService {
public class TestingRpcService implements RpcService {

// load RpcSystem once to save initialization costs
// this is safe because it is state-less
private static final RpcSystem RPC_SYSTEM_SINGLETON = RpcSystem.load();

private static final Function<RpcGateway, CompletableFuture<RpcGateway>>
DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION = CompletableFuture::completedFuture;
Expand All @@ -62,16 +67,16 @@ public class TestingRpcService extends AkkaRpcService {
private volatile Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction =
DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;

/** Creates a new {@code TestingRpcService}. */
public TestingRpcService() {
this(new Configuration());
}
private final RpcService backingRpcService;

/** Creates a new {@code TestingRpcService}, using the given configuration. */
public TestingRpcService(Configuration configuration) {
super(
AkkaUtils.createLocalActorSystem(configuration),
AkkaRpcServiceConfiguration.fromConfiguration(configuration));
public TestingRpcService() {
try {
this.backingRpcService =
RPC_SYSTEM_SINGLETON.localServiceBuilder(new Configuration()).createAndStart();
} catch (Exception e) {
throw new RuntimeException(e);
}

this.registeredConnections = new ConcurrentHashMap<>();
}
Expand All @@ -80,7 +85,7 @@ public TestingRpcService(Configuration configuration) {

@Override
public CompletableFuture<Void> stopService() {
final CompletableFuture<Void> terminationFuture = super.stopService();
final CompletableFuture<Void> terminationFuture = backingRpcService.stopService();

terminationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
Expand Down Expand Up @@ -126,7 +131,7 @@ public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class
+ clazz));
}
} else {
return super.connect(address, clazz);
return backingRpcService.connect(address, clazz);
}
}

Expand All @@ -149,7 +154,7 @@ public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture
+ clazz));
}
} else {
return super.connect(address, fencingToken, clazz);
return backingRpcService.connect(address, fencingToken, clazz);
}
}

Expand All @@ -165,4 +170,58 @@ public void setRpcGatewayFutureFunction(
Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction) {
this.rpcGatewayFutureFunction = rpcGatewayFutureFunction;
}

// ------------------------------------------------------------------------
// simple wrappers
// ------------------------------------------------------------------------

@Override
public String getAddress() {
return backingRpcService.getAddress();
}

@Override
public int getPort() {
return backingRpcService.getPort();
}

@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
return backingRpcService.startServer(rpcEndpoint);
}

@Override
public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken) {
return backingRpcService.fenceRpcServer(rpcServer, fencingToken);
}

@Override
public void stopServer(RpcServer selfGateway) {
backingRpcService.stopServer(selfGateway);
}

@Override
public CompletableFuture<Void> getTerminationFuture() {
return backingRpcService.getTerminationFuture();
}

@Override
public ScheduledExecutor getScheduledExecutor() {
return backingRpcService.getScheduledExecutor();
}

@Override
public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
return backingRpcService.scheduleRunnable(runnable, delay, unit);
}

@Override
public void execute(Runnable runnable) {
backingRpcService.execute(runnable);
}

@Override
public <T> CompletableFuture<T> execute(Callable<T> callable) {
return backingRpcService.execute(callable);
}
}
18 changes: 18 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,24 @@ under the License.
</rules>
</configuration>
</execution>
<execution>
<id>forbid-direct-akka-rpc-dependencies</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<bannedDependencies>
<excludes>
<exclude>org.apache.flink:flink-rpc-akka</exclude>
</excludes>
<message>
Direct dependencies on flink-rpc-akka are not allowed. Depend on flink-rpc-akka-loader instead, and use RpcSystem#load or the TestingRpcService.
</message>
</bannedDependencies>
</rules>
</configuration>
</execution>
<execution>
<id>dependency-convergence</id>
<!-- disabled by default as it interacts badly with shade-plugin -->
Expand Down

0 comments on commit abf9752

Please sign in to comment.