Skip to content

Commit

Permalink
Issue Fix ConcurrentModificationException in EventProcessor. (resilie…
Browse files Browse the repository at this point in the history
…nce4j#1133)


Co-authored-by: Ruslan Altynnikov <ruslan@spotify.com>
  • Loading branch information
rulle-io and rulle-sp committed Aug 20, 2020
1 parent f309519 commit e431fe0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@

import io.github.resilience4j.core.lang.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;

public class EventProcessor<T> implements EventPublisher<T> {

List<EventConsumer<T>> onEventConsumers = new CopyOnWriteArrayList<>();
ConcurrentMap<String, List<EventConsumer<T>>> eventConsumerMap = new ConcurrentHashMap<>();
final List<EventConsumer<T>> onEventConsumers = new CopyOnWriteArrayList<>();
final ConcurrentMap<String, List<EventConsumer<T>>> eventConsumerMap = new ConcurrentHashMap<>();
private boolean consumerRegistered;

public boolean hasConsumers() {
Expand All @@ -39,17 +38,17 @@ public boolean hasConsumers() {
@SuppressWarnings("unchecked")
public synchronized void registerConsumer(String className,
EventConsumer<? extends T> eventConsumer) {
this.consumerRegistered = true;
this.eventConsumerMap.compute(className, (k, consumers) -> {
if (consumers == null) {
consumers = new ArrayList<>();
consumers = new CopyOnWriteArrayList<>();
consumers.add((EventConsumer<T>) eventConsumer);
return consumers;
} else {
consumers.add((EventConsumer<T>) eventConsumer);
return consumers;
}
});
this.consumerRegistered = true;
}

public <E extends T> boolean processEvent(E event) {
Expand All @@ -71,7 +70,7 @@ public <E extends T> boolean processEvent(E event) {

@Override
public synchronized void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
this.consumerRegistered = true;
this.onEventConsumers.add(onEventConsumer);
this.consumerRegistered = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
import org.junit.Test;
import org.slf4j.Logger;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -107,4 +113,43 @@ public void testNoConsumers() {
assertThat(consumed).isEqualTo(false);
}


@Test
public void testOnEventParallel() throws ExecutionException, InterruptedException {
CountDownLatch eventConsumed = new CountDownLatch(1);
CountDownLatch waitForConsumerRegistration = new CountDownLatch(1);

EventProcessor<Number> eventProcessor = new EventProcessor<>();
EventConsumer<Integer> eventConsumer1 = event -> {
try {
eventConsumed.countDown();
waitForConsumerRegistration.await(5, TimeUnit.SECONDS);
logger.info(event.toString());
} catch (InterruptedException e) {
fail("Must not happen");
}
};

EventConsumer<Integer> eventConsumer2 = event -> logger.info(event.toString());

// 1st consumer is added
eventProcessor.registerConsumer(Integer.class.getSimpleName(), eventConsumer1);

// process first event in a separate thread to create a race condition
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
eventProcessor.processEvent(1); // blocks because of the count down latch
});

eventConsumed.await(1, TimeUnit.SECONDS);

// 2nd consumer is added
eventProcessor.registerConsumer(Integer.class.getSimpleName(), eventConsumer2);

future.get();

waitForConsumerRegistration.countDown();

then(logger).should(times(1)).info("1");
}

}

0 comments on commit e431fe0

Please sign in to comment.