Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper class to capture context using ScheduledExecutorService #6712

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
Adding more code coverage
  • Loading branch information
ammachado committed Sep 16, 2024
commit 81ce0053007a0dc13072e3ae9ec7eacb85953601
251 changes: 198 additions & 53 deletions context/src/test/java/io/opentelemetry/context/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -364,15 +365,154 @@ void wrapExecutor() {
@TestInstance(Lifecycle.PER_CLASS)
class WrapExecutorService {

protected ScheduledExecutorService executor;
protected ExecutorService executor;
protected ExecutorService wrapped;
protected AtomicReference<String> value;

protected ExecutorService wrap(ScheduledExecutorService executorService) {
protected ExecutorService wrap(ExecutorService executorService) {
return CAT.wrap(executorService);
}

protected ExecutorService wrap(ExecutorService executorService) {
@BeforeAll
void initExecutor() {
executor = Executors.newSingleThreadScheduledExecutor();
wrapped = wrap(executor);
}

@AfterAll
void stopExecutor() {
executor.shutdown();
}

@BeforeEach
void setUp() {
value = new AtomicReference<>();
}

@Test
void execute() {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
wrapped.execute(runnable);
await().untilAsserted(() -> assertThat(value).hasValue("cat"));
}

@Test
void submitRunnable() {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
Futures.getUnchecked(wrapped.submit(runnable));
assertThat(value).hasValue("cat");
}

@Test
void submitRunnableResult() {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
assertThat(Futures.getUnchecked(wrapped.submit(runnable, "foo"))).isEqualTo("foo");
assertThat(value).hasValue("cat");
}

@Test
void submitCallable() {
Callable<String> callable =
() -> {
value.set(Context.current().get(ANIMAL));
return "foo";
};
assertThat(Futures.getUnchecked(wrapped.submit(callable))).isEqualTo("foo");
assertThat(value).hasValue("cat");
}

@Test
void invokeAll() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().get(ANIMAL));
return "foo";
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().get(ANIMAL));
return "bar";
};
List<Future<String>> futures = wrapped.invokeAll(Arrays.asList(callable1, callable2));
assertThat(futures.get(0).get()).isEqualTo("foo");
assertThat(futures.get(1).get()).isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}

@Test
void invokeAllTimeout() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().get(ANIMAL));
return "foo";
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().get(ANIMAL));
return "bar";
};
List<Future<String>> futures =
wrapped.invokeAll(Arrays.asList(callable1, callable2), 10, TimeUnit.SECONDS);
assertThat(futures.get(0).get()).isEqualTo("foo");
assertThat(futures.get(1).get()).isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}

@Test
void invokeAny() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().get(ANIMAL));
throw new IllegalStateException("callable2 wins");
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().get(ANIMAL));
return "bar";
};
assertThat(wrapped.invokeAny(Arrays.asList(callable1, callable2))).isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}

@Test
void invokeAnyTimeout() throws Exception {
AtomicReference<String> value1 = new AtomicReference<>();
AtomicReference<String> value2 = new AtomicReference<>();
Callable<String> callable1 =
() -> {
value1.set(Context.current().get(ANIMAL));
throw new IllegalStateException("callable2 wins");
};
Callable<String> callable2 =
() -> {
value2.set(Context.current().get(ANIMAL));
return "bar";
};
assertThat(wrapped.invokeAny(Arrays.asList(callable1, callable2), 10, TimeUnit.SECONDS))
.isEqualTo("bar");
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}
}

@Nested
@TestInstance(Lifecycle.PER_CLASS)
class WrapScheduledExecutorService {

protected ScheduledExecutorService executor;
protected ScheduledExecutorService wrapped;
protected AtomicReference<String> value;

protected ScheduledExecutorService wrap(ScheduledExecutorService executorService) {
return CAT.wrap(executorService);
}

Expand Down Expand Up @@ -505,6 +645,58 @@ void invokeAnyTimeout() throws Exception {
assertThat(value1).hasValue("cat");
assertThat(value2).hasValue("cat");
}

@Test
void scheduleRunnable() {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
assertThat(Futures.getUnchecked(wrapped.schedule(runnable, 1L, TimeUnit.MILLISECONDS)))
.isNull();
assertThat(value).hasValue("cat");
}

@Test
void scheduleCallable() {
Callable<String> callable =
() -> {
value.set(Context.current().get(ANIMAL));
return "foo";
};
assertThat(Futures.getUnchecked(wrapped.schedule(callable, 1L, TimeUnit.MILLISECONDS)))
.isEqualTo("foo");
assertThat(value).hasValue("cat");
}

@Test
void scheduleAtFixedRate() throws Exception {
LongAdder longAdder = new LongAdder();
Runnable runnable =
() -> {
value.set(Context.current().get(ANIMAL));
longAdder.increment();
};
Future<?> future = wrapped.scheduleAtFixedRate(runnable, 1L, 2L, TimeUnit.NANOSECONDS);
assertThat(future).isNotNull();
Thread.sleep(5L);
future.cancel(true);
assertThat(longAdder.intValue()).isGreaterThan(1);
assertThat(value).hasValue("cat");
}

@Test
void scheduleWithFixedDelay() throws Exception {
LongAdder longAdder = new LongAdder();
Runnable runnable =
() -> {
value.set(Context.current().get(ANIMAL));
longAdder.increment();
};
Future<?> future = wrapped.scheduleWithFixedDelay(runnable, 1L, 2L, TimeUnit.NANOSECONDS);
assertThat(future).isNotNull();
Thread.sleep(5L);
future.cancel(true);
assertThat(longAdder.intValue()).isGreaterThan(1);
assertThat(value).hasValue("cat");
}
}

@Nested
Expand Down Expand Up @@ -533,9 +725,10 @@ void close() {

@Nested
@TestInstance(Lifecycle.PER_CLASS)
class CurrentContextWrappingScheduledExecutorService extends WrapExecutorService {
class CurrentContextWrappingScheduledExecutorService extends WrapScheduledExecutorService {

@Override
protected ExecutorService wrap(ScheduledExecutorService executorService) {
protected ScheduledExecutorService wrap(ScheduledExecutorService executorService) {
return Context.taskWrapping(executorService);
}

Expand Down Expand Up @@ -669,54 +862,6 @@ void delegatesCleanupMethods() throws Exception {
}
}

@Nested
@TestInstance(Lifecycle.PER_CLASS)
class WrapScheduledExecutorService extends WrapExecutorService {

private ScheduledExecutorService wrapScheduled;

@BeforeEach
void wrapScheduled() {
wrapScheduled = CAT.wrap(executor);
}

@Test
void scheduleRunnable() throws Exception {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
wrapScheduled.schedule(runnable, 0, TimeUnit.SECONDS).get();
assertThat(value).hasValue("cat");
}

@Test
void scheduleCallable() throws Exception {
Callable<String> callable =
() -> {
value.set(Context.current().get(ANIMAL));
return "foo";
};
assertThat(wrapScheduled.schedule(callable, 0, TimeUnit.SECONDS).get()).isEqualTo("foo");
assertThat(value).hasValue("cat");
}

@Test
void scheduleAtFixedRate() {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
ScheduledFuture<?> future =
wrapScheduled.scheduleAtFixedRate(runnable, 0, 10, TimeUnit.SECONDS);
await().untilAsserted(() -> assertThat(value).hasValue("cat"));
future.cancel(true);
}

@Test
void scheduleWithFixedDelay() {
Runnable runnable = () -> value.set(Context.current().get(ANIMAL));
ScheduledFuture<?> future =
wrapScheduled.scheduleWithFixedDelay(runnable, 0, 10, TimeUnit.SECONDS);
await().untilAsserted(() -> assertThat(value).hasValue("cat"));
future.cancel(true);
}
}

@Test
void emptyContext() {
assertThat(Context.root().get(new HashCollidingKey())).isEqualTo(null);
Expand Down
Loading