Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper class to capture context using ScheduledExecutorService #6712

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Gradle
build
.gradle
.kotlin
local.properties
out/

Expand Down
18 changes: 18 additions & 0 deletions context/src/main/java/io/opentelemetry/context/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ static ExecutorService taskWrapping(ExecutorService executorService) {
return new CurrentContextExecutorService(executorService);
}

/**
* Returns an {@link ScheduledExecutorService} which delegates to the provided {@code
* executorService}, wrapping all invocations of {@link ExecutorService} methods such as {@link
* ExecutorService#execute(Runnable)} or {@link ExecutorService#submit(Runnable)} with the
* {@linkplain Context#current() current context} at the time of invocation.
*
* <p>This is generally used to create an {@link ScheduledExecutorService} which will forward the
* {@link Context} during an invocation to another thread. For example, you may use something like
* {@code ScheduledExecutorService dbExecutor = Context.wrapTasks(threadPool)} to ensure calls
* like {@code dbExecutor.execute(() -> database.query())} have {@link Context} available on the
* thread executing database queries.
*
* @since 1.43.0
*/
static ScheduledExecutorService taskWrapping(ScheduledExecutorService executorService) {
ammachado marked this conversation as resolved.
Show resolved Hide resolved
return new CurrentContextScheduledExecutorService(executorService);
}

/**
* Returns the value stored in this {@link Context} for the given {@link ContextKey}, or {@code
* null} if there is no value for the key in this context.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.context;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

final class CurrentContextScheduledExecutorService extends ForwardingScheduledExecutorService {

CurrentContextScheduledExecutorService(ScheduledExecutorService delegate) {
super(delegate);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate().submit(Context.current().wrap(task));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate().submit(Context.current().wrap(task), result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate().submit(Context.current().wrap(task));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return delegate().invokeAll(wrap(Context.current(), tasks));
}

@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate().invokeAll(wrap(Context.current(), tasks), timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return delegate().invokeAny(wrap(Context.current(), tasks));
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate().invokeAny(wrap(Context.current(), tasks), timeout, unit);
}

@Override
public void execute(Runnable command) {
delegate().execute(Context.current().wrap(command));
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate().schedule(Context.current().wrap(command), delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate().schedule(Context.current().wrap(callable), delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate()
.scheduleAtFixedRate(Context.current().wrap(command), initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate()
.scheduleWithFixedDelay(Context.current().wrap(command), initialDelay, delay, unit);
}
}
ammachado marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.context;

import java.util.concurrent.ScheduledExecutorService;

/** A {@link ScheduledExecutorService} that implements methods that don't need {@link Context}. */
abstract class ForwardingScheduledExecutorService extends ForwardingExecutorService
ammachado marked this conversation as resolved.
Show resolved Hide resolved
implements ScheduledExecutorService {

private final ScheduledExecutorService delegate;

protected ForwardingScheduledExecutorService(ScheduledExecutorService delegate) {
super(delegate);
this.delegate = delegate;
}

@Override
ScheduledExecutorService delegate() {
return delegate;
}
}
Loading
Loading