Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper class to capture context using ScheduledExecutorService #6712

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
18 changes: 18 additions & 0 deletions context/src/main/java/io/opentelemetry/context/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ static ExecutorService taskWrapping(ExecutorService executorService) {
return new CurrentContextExecutorService(executorService);
}

/**
* Returns an {@link ScheduledExecutorService} which delegates to the provided {@code
* executorService}, wrapping all invocations of {@link ExecutorService} methods such as {@link
* ExecutorService#execute(Runnable)} or {@link ExecutorService#submit(Runnable)} with the
* {@linkplain Context#current() current context} at the time of invocation.
*
* <p>This is generally used to create an {@link ScheduledExecutorService} which will forward the
* {@link Context} during an invocation to another thread. For example, you may use something like
* {@code ScheduledExecutorService dbExecutor = Context.wrapTasks(threadPool)} to ensure calls
* like {@code dbExecutor.execute(() -> database.query())} have {@link Context} available on the
* thread executing database queries.
*
* @since 1.43.0
*/
static ScheduledExecutorService taskWrapping(ScheduledExecutorService executorService) {
ammachado marked this conversation as resolved.
Show resolved Hide resolved
return new CurrentContextScheduledExecutorService(executorService);
}

/**
* Returns the value stored in this {@link Context} for the given {@link ContextKey}, or {@code
* null} if there is no value for the key in this context.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.context;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

final class CurrentContextScheduledExecutorService extends ForwardingScheduledExecutorService {

CurrentContextScheduledExecutorService(ScheduledExecutorService delegate) {
super(delegate);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate().submit(Context.current().wrap(task));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate().submit(Context.current().wrap(task), result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate().submit(Context.current().wrap(task));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return delegate().invokeAll(wrap(Context.current(), tasks));
}

@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate().invokeAll(wrap(Context.current(), tasks), timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return delegate().invokeAny(wrap(Context.current(), tasks));
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate().invokeAny(wrap(Context.current(), tasks), timeout, unit);
}

@Override
public void execute(Runnable command) {
delegate().execute(Context.current().wrap(command));
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate().schedule(Context.current().wrap(command), delay, unit);

Check warning on line 71 in context/src/main/java/io/opentelemetry/context/CurrentContextScheduledExecutorService.java

View check run for this annotation

Codecov / codecov/patch

context/src/main/java/io/opentelemetry/context/CurrentContextScheduledExecutorService.java#L71

Added line #L71 was not covered by tests
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate().schedule(Context.current().wrap(callable), delay, unit);

Check warning on line 76 in context/src/main/java/io/opentelemetry/context/CurrentContextScheduledExecutorService.java

View check run for this annotation

Codecov / codecov/patch

context/src/main/java/io/opentelemetry/context/CurrentContextScheduledExecutorService.java#L76

Added line #L76 was not covered by tests
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate()
.scheduleAtFixedRate(Context.current().wrap(command), initialDelay, period, unit);

Check warning on line 83 in context/src/main/java/io/opentelemetry/context/CurrentContextScheduledExecutorService.java

View check run for this annotation

Codecov / codecov/patch

context/src/main/java/io/opentelemetry/context/CurrentContextScheduledExecutorService.java#L82-L83

Added lines #L82 - L83 were not covered by tests
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate()
.scheduleWithFixedDelay(Context.current().wrap(command), initialDelay, delay, unit);

Check warning on line 90 in context/src/main/java/io/opentelemetry/context/CurrentContextScheduledExecutorService.java

View check run for this annotation

Codecov / codecov/patch

context/src/main/java/io/opentelemetry/context/CurrentContextScheduledExecutorService.java#L89-L90

Added lines #L89 - L90 were not covered by tests
}
}
ammachado marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.context;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/** A {@link ScheduledExecutorService} that implements methods that don't need {@link Context}. */
abstract class ForwardingScheduledExecutorService implements ScheduledExecutorService {

private final ScheduledExecutorService delegate;

protected ForwardingScheduledExecutorService(ScheduledExecutorService delegate) {
this.delegate = delegate;
}

ScheduledExecutorService delegate() {
return delegate;
}

@Override
public final void shutdown() {
delegate.shutdown();
}

Check warning on line 31 in context/src/main/java/io/opentelemetry/context/ForwardingScheduledExecutorService.java

View check run for this annotation

Codecov / codecov/patch

context/src/main/java/io/opentelemetry/context/ForwardingScheduledExecutorService.java#L30-L31

Added lines #L30 - L31 were not covered by tests

@Override
public final List<Runnable> shutdownNow() {
return delegate.shutdownNow();

Check warning on line 35 in context/src/main/java/io/opentelemetry/context/ForwardingScheduledExecutorService.java

View check run for this annotation

Codecov / codecov/patch

context/src/main/java/io/opentelemetry/context/ForwardingScheduledExecutorService.java#L35

Added line #L35 was not covered by tests
}

@Override
public final boolean isShutdown() {
return delegate.isShutdown();

Check warning on line 40 in context/src/main/java/io/opentelemetry/context/ForwardingScheduledExecutorService.java

View check run for this annotation

Codecov / codecov/patch

context/src/main/java/io/opentelemetry/context/ForwardingScheduledExecutorService.java#L40

Added line #L40 was not covered by tests
}

@Override
public final boolean isTerminated() {
return delegate.isTerminated();

Check warning on line 45 in context/src/main/java/io/opentelemetry/context/ForwardingScheduledExecutorService.java

View check run for this annotation

Codecov / codecov/patch

context/src/main/java/io/opentelemetry/context/ForwardingScheduledExecutorService.java#L45

Added line #L45 was not covered by tests
}

@Override
public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);

Check warning on line 50 in context/src/main/java/io/opentelemetry/context/ForwardingScheduledExecutorService.java

View check run for this annotation

Codecov / codecov/patch

context/src/main/java/io/opentelemetry/context/ForwardingScheduledExecutorService.java#L50

Added line #L50 was not covered by tests
}

protected static <T> Collection<? extends Callable<T>> wrap(
Context context, Collection<? extends Callable<T>> tasks) {
List<Callable<T>> wrapped = new ArrayList<>();
for (Callable<T> task : tasks) {
wrapped.add(context.wrap(task));
}
return wrapped;
}
}
28 changes: 28 additions & 0 deletions context/src/test/java/io/opentelemetry/context/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ class WrapExecutorService {
protected ExecutorService wrapped;
protected AtomicReference<String> value;

protected ExecutorService wrap(ScheduledExecutorService executorService) {
return CAT.wrap(executorService);
}

protected ExecutorService wrap(ExecutorService executorService) {
return CAT.wrap(executorService);
}
Expand Down Expand Up @@ -525,6 +529,30 @@ void close() {
}
}

@Nested
@TestInstance(Lifecycle.PER_CLASS)
class CurrentContextWrappingScheduledExecutorService extends WrapExecutorService {
@Override
protected ExecutorService wrap(ScheduledExecutorService executorService) {
return Context.taskWrapping(executorService);
}

private Scope scope;

@BeforeEach
// Closed in AfterEach
@SuppressWarnings("MustBeClosedChecker")
void makeCurrent() {
scope = CAT.makeCurrent();
}

@AfterEach
void close() {
scope.close();
scope = null;
}
}

@Test
void keyToString() {
assertThat(ANIMAL.toString()).isEqualTo("animal");
Expand Down
4 changes: 3 additions & 1 deletion docs/apidiffs/current_vs_latest/opentelemetry-context.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Comparing source compatibility of opentelemetry-context-1.43.0-SNAPSHOT.jar against opentelemetry-context-1.42.1.jar
No changes.
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.context.Context (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) STATIC(+) java.util.concurrent.ScheduledExecutorService taskWrapping(java.util.concurrent.ScheduledExecutorService)
Loading