Skip to content

Commit

Permalink
[WFCORE-3102]: CloseableLocalClient
Browse files Browse the repository at this point in the history
Closing the LocalClient will interrupt running operations.
  • Loading branch information
ehsavoie committed Jun 12, 2018
1 parent 9517cba commit ad98b11
Showing 1 changed file with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -113,6 +116,7 @@ private static class LocalClient implements LocalModelControllerClient {
private final Supplier<SecurityIdentity> securityIdentitySupplier;
private final Executor executor;
private final boolean forUserCalls;
private final Set<AtomicReference<Thread>> threads = Collections.synchronizedSet(new HashSet<>());

private LocalClient(ModelController modelController, Supplier<SecurityIdentity> securityIdentitySupplier, Executor executor, boolean forUserCalls) {
this.modelController = modelController;
Expand All @@ -122,8 +126,15 @@ private LocalClient(ModelController modelController, Supplier<SecurityIdentity>
}

@Override
public void close() {
// whatever
public void close() {
synchronized (threads) {
for(AtomicReference<Thread> threadRef : threads){
Thread thread = threadRef.get();
if (thread != null) {
thread.interrupt();
}
}
}
}

@Override
Expand Down Expand Up @@ -166,6 +177,7 @@ private <T> AsyncFuture<T> executeAsync(final ModelNode op, final OperationMessa

final ModelNode operation = sanitizeOperation(op);
final AtomicReference<Thread> opThread = new AtomicReference<>();
threads.add(opThread);
final ResponseFuture<T> responseFuture = new ResponseFuture<>(opThread, responseConverter, executor);

final SecurityIdentity securityIdentity = securityIdentitySupplier.get();
Expand Down Expand Up @@ -194,6 +206,7 @@ public OperationResponse run() {
} finally {
synchronized (opThread) {
opThread.set(null);
threads.remove(opThread);
opThread.notifyAll();
}
}
Expand Down

0 comments on commit ad98b11

Please sign in to comment.