diff --git a/vertx-core/src/test/java/io/vertx/tests/eventbus/ClusteredEventBusTest.java b/vertx-core/src/test/java/io/vertx/tests/eventbus/ClusteredEventBusTest.java index 15a6ec3442a..384489051f8 100644 --- a/vertx-core/src/test/java/io/vertx/tests/eventbus/ClusteredEventBusTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/eventbus/ClusteredEventBusTest.java @@ -45,7 +45,7 @@ public class ClusteredEventBusTest extends ClusteredEventBusTestBase { @Test - public void testLocalHandlerNotVisibleRemotely() throws Exception { + public void testLocalHandlerNotVisibleRemotely() { startNodes(2); vertices[1].eventBus().localConsumer(ADDRESS1).handler(msg -> { fail("Should not receive message"); @@ -396,21 +396,15 @@ public void testSendWriteHandler() throws Exception { CountDownLatch updateLatch = new CountDownLatch(3); startNodes(2, () -> new WrappedClusterManager(getClusterManager()) { @Override - public void registrationListener(RegistrationListener registrationListener) { - super.registrationListener(new WrappedNodeSelector((NodeSelector) registrationListener) { - @Override - public void registrationsUpdated(RegistrationUpdateEvent event) { - super.registrationsUpdated(event); - if (event.address().equals(ADDRESS1) && event.registrations().size() == 1) { - updateLatch.countDown(); - } - } - - @Override - public boolean wantsUpdatesFor(String address) { - return true; - } - }); + public void registrationsUpdated(RegistrationUpdateEvent event) { + super.registrationsUpdated(event); + if (event.address().equals(ADDRESS1) && event.registrations().size() == 1) { + updateLatch.countDown(); + } + } + @Override + public boolean wantsUpdatesFor(String address) { + return true; } }); waitFor(2); @@ -486,17 +480,10 @@ public void testWriteHandlerConnectFailure() { @Test public void testSelectorWantsUpdates() { - AtomicReference nodeSelectorRef = new AtomicReference<>(); - startNodes(1, () -> new WrappedClusterManager(getClusterManager()) { - @Override - public void registrationListener(RegistrationListener registrationListener) { - nodeSelectorRef.set((NodeSelector) registrationListener); - super.registrationListener(registrationListener); - } - }); - assertNotNull(nodeSelectorRef.get()); + WrappedClusterManager wrapped = new WrappedClusterManager(getClusterManager()); + startNodes(1, () -> wrapped); vertices[0].eventBus().consumer(ADDRESS1, msg -> { - assertTrue(nodeSelectorRef.get().wantsUpdatesFor(ADDRESS1)); + assertTrue(wrapped.wantsUpdatesFor(ADDRESS1)); testComplete(); }).completion().onComplete(onSuccess(v -> vertices[0].eventBus().send(ADDRESS1, "foo"))); await(); @@ -504,15 +491,9 @@ public void registrationListener(RegistrationListener registrationListener) { @Test public void testSelectorDoesNotWantUpdates() { - AtomicReference nodeSelectorRef = new AtomicReference<>(); - startNodes(1, () -> new WrappedClusterManager(getClusterManager()) { - @Override - public void registrationListener(RegistrationListener registrationListener) { - nodeSelectorRef.set((NodeSelector) registrationListener); - } - }); - assertNotNull(nodeSelectorRef.get()); - assertFalse(nodeSelectorRef.get().wantsUpdatesFor(ADDRESS1)); + WrappedClusterManager wrapped = new WrappedClusterManager(getClusterManager()); + startNodes(1, () -> wrapped); + assertFalse(wrapped.wantsUpdatesFor(ADDRESS1)); } @Test diff --git a/vertx-core/src/test/java/io/vertx/tests/eventbus/ClusteredEventBusTestBase.java b/vertx-core/src/test/java/io/vertx/tests/eventbus/ClusteredEventBusTestBase.java index ddd1fafd0ba..5aa5f84c6b8 100644 --- a/vertx-core/src/test/java/io/vertx/tests/eventbus/ClusteredEventBusTestBase.java +++ b/vertx-core/src/test/java/io/vertx/tests/eventbus/ClusteredEventBusTestBase.java @@ -13,10 +13,8 @@ import io.vertx.core.*; import io.vertx.core.eventbus.*; -import io.vertx.core.spi.cluster.RegistrationListener; import io.vertx.tests.shareddata.AsyncMapTest; import io.vertx.core.spi.cluster.ClusterManager; -import io.vertx.core.spi.cluster.impl.NodeSelector; import io.vertx.core.spi.cluster.RegistrationUpdateEvent; import io.vertx.test.core.TestUtils; import io.vertx.test.fakecluster.FakeClusterManager; @@ -125,16 +123,11 @@ public void testClusteredUnregistration() throws Exception { CountDownLatch updateLatch = new CountDownLatch(3); startNodes(2, () -> new WrappedClusterManager(getClusterManager()) { @Override - public void registrationListener(RegistrationListener registrationListener) { - super.registrationListener(new WrappedNodeSelector((NodeSelector) registrationListener) { - @Override - public void registrationsUpdated(RegistrationUpdateEvent event) { - super.registrationsUpdated(event); - if (event.address().equals("foo") && event.registrations().isEmpty()) { - updateLatch.countDown(); - } - } - }); + public void registrationsUpdated(RegistrationUpdateEvent event) { + super.registrationsUpdated(event); + if (event.address().equals("foo") && event.registrations().isEmpty()) { + updateLatch.countDown(); + } } }); MessageConsumer consumer = vertices[0].eventBus().consumer("foo", msg -> msg.reply(msg.body())); diff --git a/vertx-core/src/test/java/io/vertx/tests/eventbus/WrappedClusterManager.java b/vertx-core/src/test/java/io/vertx/tests/eventbus/WrappedClusterManager.java index 49c4902ca91..26978e9cd25 100644 --- a/vertx-core/src/test/java/io/vertx/tests/eventbus/WrappedClusterManager.java +++ b/vertx-core/src/test/java/io/vertx/tests/eventbus/WrappedClusterManager.java @@ -17,6 +17,7 @@ import io.vertx.core.shareddata.Counter; import io.vertx.core.shareddata.Lock; import io.vertx.core.spi.cluster.*; +import io.vertx.core.spi.cluster.impl.NodeSelector; import java.util.List; import java.util.Map; @@ -24,6 +25,7 @@ public class WrappedClusterManager implements ClusterManager { private final ClusterManager delegate; + private NodeSelector nodeSelector; public WrappedClusterManager(ClusterManager delegate) { this.delegate = delegate; @@ -100,8 +102,39 @@ public boolean isActive() { } @Override - public void registrationListener(RegistrationListener registrationListener) { - delegate.registrationListener(registrationListener); + public final void registrationListener(RegistrationListener registrationListener) { + nodeSelector = (NodeSelector) registrationListener; + NodeSelector interceptor = new NodeSelector() { + @Override + public void init(Vertx vertx, ClusterManager clusterManager) { + nodeSelector.init(vertx, clusterManager); + } + @Override + public void eventBusStarted() { + nodeSelector.eventBusStarted(); + } + @Override + public void selectForSend(String address, Promise promise) { + nodeSelector.selectForSend(address, promise); + } + @Override + public void selectForPublish(String address, Promise> promise) { + nodeSelector.selectForPublish(address, promise); + } + @Override + public void registrationsUpdated(RegistrationUpdateEvent event) { + WrappedClusterManager.this.registrationsUpdated(event); + } + @Override + public void registrationsLost() { + WrappedClusterManager.this.registrationsLost(); + } + @Override + public boolean wantsUpdatesFor(String address) { + return WrappedClusterManager.this.wantsUpdatesFor(address); + } + }; + delegate.registrationListener(interceptor); } @Override @@ -132,4 +165,16 @@ public String clusterPublicHost() { public ClusterManager getDelegate() { return delegate; } + + public void registrationsUpdated(RegistrationUpdateEvent event) { + nodeSelector.registrationsUpdated(event); + } + + public void registrationsLost() { + nodeSelector.registrationsLost(); + } + + public boolean wantsUpdatesFor(String address) { + return nodeSelector.wantsUpdatesFor(address); + } } diff --git a/vertx-core/src/test/java/io/vertx/tests/eventbus/WrappedNodeSelector.java b/vertx-core/src/test/java/io/vertx/tests/eventbus/WrappedNodeSelector.java deleted file mode 100644 index c5074bf092f..00000000000 --- a/vertx-core/src/test/java/io/vertx/tests/eventbus/WrappedNodeSelector.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2011-2021 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 - * which is available at https://www.apache.org/licenses/LICENSE-2.0. - * - * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 - */ - -package io.vertx.tests.eventbus; - -import io.vertx.core.Promise; -import io.vertx.core.Vertx; -import io.vertx.core.spi.cluster.ClusterManager; -import io.vertx.core.spi.cluster.RegistrationUpdateEvent; -import io.vertx.core.spi.cluster.impl.NodeSelector; - -public class WrappedNodeSelector implements NodeSelector { - - private final NodeSelector delegate; - - public WrappedNodeSelector(NodeSelector delegate) { - this.delegate = delegate; - } - - @Override - public void init(Vertx vertx, ClusterManager clusterManager) { - delegate.init(vertx, clusterManager); - } - - @Override - public void eventBusStarted() { - delegate.eventBusStarted(); - } - - @Override - public void selectForSend(String address, Promise promise) { - delegate.selectForSend(address, promise); - } - - @Override - public void selectForPublish(String address, Promise> promise) { - delegate.selectForPublish(address, promise); - } - - @Override - public void registrationsUpdated(RegistrationUpdateEvent event) { - delegate.registrationsUpdated(event); - } - - @Override - public void registrationsLost() { - delegate.registrationsLost(); - } - - @Override - public boolean wantsUpdatesFor(String address) { - return delegate.wantsUpdatesFor(address); - } -}