Skip to content

Commit

Permalink
Simplify the WrappedClusterManager
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Sep 26, 2024
1 parent 0bfec34 commit b086b4c
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
public class ClusteredEventBusTest extends ClusteredEventBusTestBase {

@Test
public void testLocalHandlerNotVisibleRemotely() throws Exception {
public void testLocalHandlerNotVisibleRemotely() {
startNodes(2);
vertices[1].eventBus().localConsumer(ADDRESS1).handler(msg -> {
fail("Should not receive message");
Expand Down Expand Up @@ -396,21 +396,15 @@ public void testSendWriteHandler() throws Exception {
CountDownLatch updateLatch = new CountDownLatch(3);
startNodes(2, () -> new WrappedClusterManager(getClusterManager()) {
@Override
public void registrationListener(RegistrationListener registrationListener) {
super.registrationListener(new WrappedNodeSelector((NodeSelector) registrationListener) {
@Override
public void registrationsUpdated(RegistrationUpdateEvent event) {
super.registrationsUpdated(event);
if (event.address().equals(ADDRESS1) && event.registrations().size() == 1) {
updateLatch.countDown();
}
}

@Override
public boolean wantsUpdatesFor(String address) {
return true;
}
});
public void registrationsUpdated(RegistrationUpdateEvent event) {
super.registrationsUpdated(event);
if (event.address().equals(ADDRESS1) && event.registrations().size() == 1) {
updateLatch.countDown();
}
}
@Override
public boolean wantsUpdatesFor(String address) {
return true;
}
});
waitFor(2);
Expand Down Expand Up @@ -486,33 +480,20 @@ public void testWriteHandlerConnectFailure() {

@Test
public void testSelectorWantsUpdates() {
AtomicReference<NodeSelector> nodeSelectorRef = new AtomicReference<>();
startNodes(1, () -> new WrappedClusterManager(getClusterManager()) {
@Override
public void registrationListener(RegistrationListener registrationListener) {
nodeSelectorRef.set((NodeSelector) registrationListener);
super.registrationListener(registrationListener);
}
});
assertNotNull(nodeSelectorRef.get());
WrappedClusterManager wrapped = new WrappedClusterManager(getClusterManager());
startNodes(1, () -> wrapped);
vertices[0].eventBus().consumer(ADDRESS1, msg -> {
assertTrue(nodeSelectorRef.get().wantsUpdatesFor(ADDRESS1));
assertTrue(wrapped.wantsUpdatesFor(ADDRESS1));
testComplete();
}).completion().onComplete(onSuccess(v -> vertices[0].eventBus().send(ADDRESS1, "foo")));
await();
}

@Test
public void testSelectorDoesNotWantUpdates() {
AtomicReference<NodeSelector> nodeSelectorRef = new AtomicReference<>();
startNodes(1, () -> new WrappedClusterManager(getClusterManager()) {
@Override
public void registrationListener(RegistrationListener registrationListener) {
nodeSelectorRef.set((NodeSelector) registrationListener);
}
});
assertNotNull(nodeSelectorRef.get());
assertFalse(nodeSelectorRef.get().wantsUpdatesFor(ADDRESS1));
WrappedClusterManager wrapped = new WrappedClusterManager(getClusterManager());
startNodes(1, () -> wrapped);
assertFalse(wrapped.wantsUpdatesFor(ADDRESS1));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@

import io.vertx.core.*;
import io.vertx.core.eventbus.*;
import io.vertx.core.spi.cluster.RegistrationListener;
import io.vertx.tests.shareddata.AsyncMapTest;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.impl.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakecluster.FakeClusterManager;
Expand Down Expand Up @@ -125,16 +123,11 @@ public void testClusteredUnregistration() throws Exception {
CountDownLatch updateLatch = new CountDownLatch(3);
startNodes(2, () -> new WrappedClusterManager(getClusterManager()) {
@Override
public void registrationListener(RegistrationListener registrationListener) {
super.registrationListener(new WrappedNodeSelector((NodeSelector) registrationListener) {
@Override
public void registrationsUpdated(RegistrationUpdateEvent event) {
super.registrationsUpdated(event);
if (event.address().equals("foo") && event.registrations().isEmpty()) {
updateLatch.countDown();
}
}
});
public void registrationsUpdated(RegistrationUpdateEvent event) {
super.registrationsUpdated(event);
if (event.address().equals("foo") && event.registrations().isEmpty()) {
updateLatch.countDown();
}
}
});
MessageConsumer<Object> consumer = vertices[0].eventBus().consumer("foo", msg -> msg.reply(msg.body()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.spi.cluster.*;
import io.vertx.core.spi.cluster.impl.NodeSelector;

import java.util.List;
import java.util.Map;

public class WrappedClusterManager implements ClusterManager {

private final ClusterManager delegate;
private NodeSelector nodeSelector;

public WrappedClusterManager(ClusterManager delegate) {
this.delegate = delegate;
Expand Down Expand Up @@ -100,8 +102,39 @@ public boolean isActive() {
}

@Override
public void registrationListener(RegistrationListener registrationListener) {
delegate.registrationListener(registrationListener);
public final void registrationListener(RegistrationListener registrationListener) {
nodeSelector = (NodeSelector) registrationListener;
NodeSelector interceptor = new NodeSelector() {
@Override
public void init(Vertx vertx, ClusterManager clusterManager) {
nodeSelector.init(vertx, clusterManager);
}
@Override
public void eventBusStarted() {
nodeSelector.eventBusStarted();
}
@Override
public void selectForSend(String address, Promise<String> promise) {
nodeSelector.selectForSend(address, promise);
}
@Override
public void selectForPublish(String address, Promise<Iterable<String>> promise) {
nodeSelector.selectForPublish(address, promise);
}
@Override
public void registrationsUpdated(RegistrationUpdateEvent event) {
WrappedClusterManager.this.registrationsUpdated(event);
}
@Override
public void registrationsLost() {
WrappedClusterManager.this.registrationsLost();
}
@Override
public boolean wantsUpdatesFor(String address) {
return WrappedClusterManager.this.wantsUpdatesFor(address);
}
};
delegate.registrationListener(interceptor);
}

@Override
Expand Down Expand Up @@ -132,4 +165,16 @@ public String clusterPublicHost() {
public ClusterManager getDelegate() {
return delegate;
}

public void registrationsUpdated(RegistrationUpdateEvent event) {
nodeSelector.registrationsUpdated(event);
}

public void registrationsLost() {
nodeSelector.registrationsLost();
}

public boolean wantsUpdatesFor(String address) {
return nodeSelector.wantsUpdatesFor(address);
}
}

This file was deleted.

0 comments on commit b086b4c

Please sign in to comment.