Skip to content

Commit

Permalink
[WFCORE-4207]: Add capabilities in the Thread subsystem resource defi…
Browse files Browse the repository at this point in the history
…nitions.

* Adding capabilities and registering services with the new names.
* UnboundedQueueThreadPool can be created with core threads allowed to timeout.

JIRA: https://issues.jboss.org/browse/WFCORE-4207
  • Loading branch information
ehsavoie committed Jan 14, 2019
1 parent 5012d46 commit 5ff07e4
Show file tree
Hide file tree
Showing 26 changed files with 492 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
*/
package org.jboss.as.threads;



import org.jboss.as.controller.AbstractAddStepHandler;
import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
import org.jboss.as.controller.capability.RuntimeCapability;
import org.jboss.as.threads.ThreadPoolManagementUtils.BoundedThreadPoolParameters;
import org.jboss.dmr.ModelNode;
import org.jboss.msc.service.ServiceName;
Expand Down Expand Up @@ -58,21 +61,28 @@ public class BoundedQueueThreadPoolAdd extends AbstractAddStepHandler {
private final ThreadFactoryResolver threadFactoryResolver;
private final HandoffExecutorResolver handoffExecutorResolver;
private final ServiceName serviceNameBase;
private final RuntimeCapability<Void> capability;

public BoundedQueueThreadPoolAdd(boolean blocking, ThreadFactoryResolver threadFactoryResolver,
HandoffExecutorResolver handoffExecutorResolver, ServiceName serviceNameBase) {
this(blocking, threadFactoryResolver, handoffExecutorResolver, serviceNameBase, null);
}

public BoundedQueueThreadPoolAdd(boolean blocking, ThreadFactoryResolver threadFactoryResolver,
HandoffExecutorResolver handoffExecutorResolver, ServiceName serviceNameBase,
RuntimeCapability<Void> capability) {
super(blocking ? BLOCKING_ATTRIBUTES : NON_BLOCKING_ATTRIBUTES);
this.blocking = blocking;
this.threadFactoryResolver = threadFactoryResolver;
this.handoffExecutorResolver = handoffExecutorResolver;
this.serviceNameBase = serviceNameBase;
this.capability= capability;
}

@Override
protected void performRuntime(final OperationContext context, final ModelNode operation, final ModelNode model) throws OperationFailedException {

final BoundedThreadPoolParameters params = ThreadPoolManagementUtils.parseBoundedThreadPoolParameters(context, operation, model, blocking);

final BoundedQueueThreadPoolService service = new BoundedQueueThreadPoolService(
params.getCoreThreads(),
params.getMaxThreads(),
Expand All @@ -81,8 +91,8 @@ protected void performRuntime(final OperationContext context, final ModelNode op
params.getKeepAliveTime(),
params.isAllowCoreTimeout());

ThreadPoolManagementUtils.installThreadPoolService(service, params.getName(), serviceNameBase,
params.getThreadFactory(), threadFactoryResolver, service.getThreadFactoryInjector(),
ThreadPoolManagementUtils.installThreadPoolService(service, params.getName(), capability, context.getCurrentAddress(),
serviceNameBase, params.getThreadFactory(), threadFactoryResolver, service.getThreadFactoryInjector(),
params.getHandoffExecutor(), handoffExecutorResolver, blocking ? null : service.getHandoffExecutorInjector(),
context.getServiceTarget());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
import org.jboss.as.controller.capability.RuntimeCapability;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.ServiceName;

Expand All @@ -44,24 +45,33 @@ public class BoundedQueueThreadPoolMetricsHandler extends ThreadPoolMetricsHandl
PoolAttributeDefinitions.QUEUE_SIZE);

public BoundedQueueThreadPoolMetricsHandler(final ServiceName serviceNameBase) {
super(METRICS, serviceNameBase);
this(null, serviceNameBase);
}

public BoundedQueueThreadPoolMetricsHandler(final RuntimeCapability capability, final ServiceName serviceNameBase) {
super(METRICS, capability, serviceNameBase);
}

@Override
protected void setResult(OperationContext context, final String attributeName, final Service<?> service)
throws OperationFailedException {
BoundedQueueThreadPoolService bounded = (BoundedQueueThreadPoolService) service;
if(attributeName.equals(CommonAttributes.CURRENT_THREAD_COUNT)) {
context.getResult().set(bounded.getCurrentThreadCount());
} else if (attributeName.equals(CommonAttributes.LARGEST_THREAD_COUNT)) {
context.getResult().set(bounded.getLargestThreadCount());
} else if (attributeName.equals(CommonAttributes.REJECTED_COUNT)) {
context.getResult().set(bounded.getRejectedCount());
} else if (attributeName.equals(CommonAttributes.QUEUE_SIZE)) {
context.getResult().set(bounded.getQueueSize());
} else {
// Programming bug. Throw a RuntimeException, not OFE, as this is not a client error
throw ThreadsLogger.ROOT_LOGGER.unsupportedBoundedQueueThreadPoolMetric(attributeName);
switch (attributeName) {
case CommonAttributes.CURRENT_THREAD_COUNT:
context.getResult().set(bounded.getCurrentThreadCount());
break;
case CommonAttributes.LARGEST_THREAD_COUNT:
context.getResult().set(bounded.getLargestThreadCount());
break;
case CommonAttributes.REJECTED_COUNT:
context.getResult().set(bounded.getRejectedCount());
break;
case CommonAttributes.QUEUE_SIZE:
context.getResult().set(bounded.getQueueSize());
break;
default:
// Programming bug. Throw a RuntimeException, not OFE, as this is not a client error
throw ThreadsLogger.ROOT_LOGGER.unsupportedBoundedQueueThreadPoolMetric(attributeName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

package org.jboss.as.threads;

import static org.jboss.as.threads.CommonAttributes.BLOCKING_BOUNDED_QUEUE_THREAD_POOL;
import static org.jboss.as.threads.CommonAttributes.BOUNDED_QUEUE_THREAD_POOL;

import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.OperationStepHandler;
import org.jboss.as.controller.PathElement;
Expand All @@ -32,6 +35,8 @@

import java.util.Arrays;
import java.util.Collection;
import org.jboss.as.controller.SimpleResourceDefinition;
import org.jboss.as.controller.capability.RuntimeCapability;

/**
* {@link org.jboss.as.controller.ResourceDefinition} for a bounded queue thread pool resource.
Expand All @@ -48,47 +53,70 @@ public class BoundedQueueThreadPoolResourceDefinition extends PersistentResource

public static BoundedQueueThreadPoolResourceDefinition create(boolean blocking, boolean registerRuntimeOnly) {
if (blocking) {
return create(CommonAttributes.BLOCKING_BOUNDED_QUEUE_THREAD_POOL, ThreadsServices.STANDARD_THREAD_FACTORY_RESOLVER,
return create(BLOCKING_BOUNDED_QUEUE_THREAD_POOL,
ThreadsServices.getThreadFactoryResolver(BLOCKING_BOUNDED_QUEUE_THREAD_POOL),
null, ThreadsServices.EXECUTOR, registerRuntimeOnly);
} else {
return create(CommonAttributes.BOUNDED_QUEUE_THREAD_POOL, ThreadsServices.STANDARD_THREAD_FACTORY_RESOLVER,
ThreadsServices.STANDARD_HANDOFF_EXECUTOR_RESOLVER, ThreadsServices.EXECUTOR, registerRuntimeOnly);
return create(CommonAttributes.BOUNDED_QUEUE_THREAD_POOL,
ThreadsServices.getThreadFactoryResolver(BOUNDED_QUEUE_THREAD_POOL),
ThreadsServices.getHandoffExecutorResolver(BOUNDED_QUEUE_THREAD_POOL),
ThreadsServices.EXECUTOR, registerRuntimeOnly);
}
}

public static BoundedQueueThreadPoolResourceDefinition create(boolean blocking, String type, boolean registerRuntimeOnly) {
if (blocking) {
return create(type, ThreadsServices.STANDARD_THREAD_FACTORY_RESOLVER, null, ThreadsServices.EXECUTOR, registerRuntimeOnly);
return create(type, ThreadsServices.getThreadFactoryResolver(type), null, ThreadsServices.EXECUTOR, registerRuntimeOnly);
} else {
return create(type, ThreadsServices.STANDARD_THREAD_FACTORY_RESOLVER, ThreadsServices.STANDARD_HANDOFF_EXECUTOR_RESOLVER,
return create(type, ThreadsServices.getThreadFactoryResolver(type), ThreadsServices.getHandoffExecutorResolver(type),
ThreadsServices.EXECUTOR, registerRuntimeOnly);
}
}

public static BoundedQueueThreadPoolResourceDefinition create(String type, ThreadFactoryResolver threadFactoryResolver,
HandoffExecutorResolver handoffExecutorResolver,
ServiceName poolNameBase, boolean registerRuntimeOnly) {
return create(PathElement.pathElement(type), threadFactoryResolver, handoffExecutorResolver,
ThreadsServices.createCapability(type, ManagedQueueExecutorService.class), poolNameBase,
registerRuntimeOnly);
}

public static BoundedQueueThreadPoolResourceDefinition create(PathElement path, ThreadFactoryResolver threadFactoryResolver,
HandoffExecutorResolver handoffExecutorResolver,
RuntimeCapability<Void> capability,
ServiceName poolNameBase, boolean registerRuntimeOnly) {
final boolean blocking = handoffExecutorResolver == null;
final String resolverPrefix = blocking ? CommonAttributes.BLOCKING_BOUNDED_QUEUE_THREAD_POOL : CommonAttributes.BOUNDED_QUEUE_THREAD_POOL;
final BoundedQueueThreadPoolAdd addHandler = new BoundedQueueThreadPoolAdd(blocking, threadFactoryResolver, handoffExecutorResolver, poolNameBase);
final String resolverPrefix = blocking ? BLOCKING_BOUNDED_QUEUE_THREAD_POOL : BOUNDED_QUEUE_THREAD_POOL;
final BoundedQueueThreadPoolAdd addHandler = new BoundedQueueThreadPoolAdd(blocking, threadFactoryResolver,
handoffExecutorResolver, poolNameBase, capability);
final OperationStepHandler removeHandler = new BoundedQueueThreadPoolRemove(addHandler);
return new BoundedQueueThreadPoolResourceDefinition(blocking, registerRuntimeOnly, type, poolNameBase, resolverPrefix, addHandler, removeHandler);
return new BoundedQueueThreadPoolResourceDefinition(blocking, registerRuntimeOnly, path, capability,
poolNameBase, resolverPrefix, addHandler, removeHandler);
}

/**
* @deprecated This class is not designed for subclassing and having this constructor be accessible is a specific workaround for WFCORE-1623 that may be reverted at any time
*/
@Deprecated
protected BoundedQueueThreadPoolResourceDefinition(boolean blocking, boolean registerRuntimeOnly,
String type, ServiceName serviceNameBase, String resolverPrefix, OperationStepHandler addHandler,
OperationStepHandler removeHandler) {
super(PathElement.pathElement(type),
new ThreadPoolResourceDescriptionResolver(resolverPrefix, ThreadsExtension.RESOURCE_NAME, ThreadsExtension.class.getClassLoader()),
addHandler, removeHandler);
this(blocking, registerRuntimeOnly, PathElement.pathElement(type), ThreadsServices.createCapability(type, ManagedQueueExecutorService.class),
serviceNameBase, resolverPrefix, addHandler, removeHandler);
}

private BoundedQueueThreadPoolResourceDefinition(boolean blocking, boolean registerRuntimeOnly, PathElement path,
RuntimeCapability<Void> capability, ServiceName serviceNameBase,
String resolverPrefix, OperationStepHandler addHandler,
OperationStepHandler removeHandler) {
super(new SimpleResourceDefinition.Parameters(path, new ThreadPoolResourceDescriptionResolver(resolverPrefix,
ThreadsExtension.RESOURCE_NAME, ThreadsExtension.class.getClassLoader()))
.setAddHandler(addHandler)
.setRemoveHandler(removeHandler)
.setCapabilities(capability));
this.registerRuntimeOnly = registerRuntimeOnly;
this.blocking = blocking;
metricsHandler = new BoundedQueueThreadPoolMetricsHandler(serviceNameBase);
writeHandler = new BoundedQueueThreadPoolWriteAttributeHandler(blocking, serviceNameBase);
metricsHandler = new BoundedQueueThreadPoolMetricsHandler(capability, serviceNameBase);
writeHandler = new BoundedQueueThreadPoolWriteAttributeHandler(blocking, capability, serviceNameBase);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
package org.jboss.as.threads;


import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OP_ADDR;

import java.util.concurrent.TimeUnit;

import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
import org.jboss.as.controller.operations.common.Util;
import org.jboss.as.controller.capability.RuntimeCapability;
import org.jboss.dmr.ModelNode;
import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceName;
Expand All @@ -41,11 +40,13 @@
public class BoundedQueueThreadPoolWriteAttributeHandler extends ThreadsWriteAttributeOperationHandler {

private final ServiceName serviceNameBase;
private final RuntimeCapability capability;

public BoundedQueueThreadPoolWriteAttributeHandler(boolean blocking, ServiceName serviceNameBase) {
public BoundedQueueThreadPoolWriteAttributeHandler(boolean blocking, final RuntimeCapability capability, ServiceName serviceNameBase) {
super(blocking ? BoundedQueueThreadPoolAdd.BLOCKING_ATTRIBUTES : BoundedQueueThreadPoolAdd.NON_BLOCKING_ATTRIBUTES,
BoundedQueueThreadPoolAdd.RW_ATTRIBUTES);
this.serviceNameBase = serviceNameBase;
this.capability = capability;
}

@Override
Expand Down Expand Up @@ -86,9 +87,20 @@ protected void applyOperation(final OperationContext context, ModelNode model, S

@Override
protected ServiceController<?> getService(final OperationContext context, final ModelNode model) throws OperationFailedException {
final String name = Util.getNameFromAddress(model.require(OP_ADDR));
final ServiceName serviceName = serviceNameBase.append(name);
ServiceController<?> controller = context.getServiceRegistry(true).getService(serviceName);
final String name = context.getCurrentAddressValue();
ServiceName serviceName = null;
ServiceController<?> controller = null;
if(capability != null) {
serviceName = capability.getCapabilityServiceName(context.getCurrentAddress());
controller = context.getServiceRegistry(true).getService(serviceName);
if(controller != null) {
return controller;
}
}
if (serviceNameBase != null) {
serviceName = serviceNameBase.append(name);
controller = context.getServiceRegistry(true).getService(serviceName);
}
if(controller == null) {
throw ThreadsLogger.ROOT_LOGGER.boundedQueueThreadPoolServiceNotFound(serviceName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public interface PoolAttributeDefinitions {
.build();

SimpleAttributeDefinition THREAD_FACTORY = new SimpleAttributeDefinitionBuilder(CommonAttributes.THREAD_FACTORY, ModelType.STRING, true)
.setCapabilityReference("org.wildfly.threads.thread-factory")
.setFlags(AttributeAccess.Flag.RESTART_ALL_SERVICES).build();

SimpleAttributeDefinition MAX_THREADS = new SimpleAttributeDefinitionBuilder(CommonAttributes.MAX_THREADS, ModelType.INT, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
*/
package org.jboss.as.threads;



import org.jboss.as.controller.AbstractAddStepHandler;
import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
import org.jboss.as.controller.capability.RuntimeCapability;
import org.jboss.as.threads.ThreadPoolManagementUtils.QueuelessThreadPoolParameters;
import org.jboss.dmr.ModelNode;
import org.jboss.msc.service.ServiceName;
Expand Down Expand Up @@ -54,14 +57,21 @@ public class QueuelessThreadPoolAdd extends AbstractAddStepHandler {
private final ThreadFactoryResolver threadFactoryResolver;
private final HandoffExecutorResolver handoffExecutorResolver;
private final ServiceName serviceNameBase;
private final RuntimeCapability<Void> capability;

public QueuelessThreadPoolAdd(boolean blocking, ThreadFactoryResolver threadFactoryResolver,
HandoffExecutorResolver handoffExecutorResolver, ServiceName serviceNameBase) {
this(blocking, threadFactoryResolver, handoffExecutorResolver, serviceNameBase, null);
}

public QueuelessThreadPoolAdd(boolean blocking, ThreadFactoryResolver threadFactoryResolver,
HandoffExecutorResolver handoffExecutorResolver, ServiceName serviceNameBase) {
HandoffExecutorResolver handoffExecutorResolver, ServiceName serviceNameBase, RuntimeCapability<Void> capability) {
super(blocking ? BLOCKING_ATTRIBUTES : NON_BLOCKING_ATTRIBUTES);
this.blocking = blocking;
this.threadFactoryResolver = threadFactoryResolver;
this.handoffExecutorResolver = handoffExecutorResolver;
this.serviceNameBase = serviceNameBase;
this.capability = capability;
}

@Override
Expand All @@ -71,8 +81,8 @@ protected void performRuntime(final OperationContext context, final ModelNode op

final QueuelessThreadPoolService service = new QueuelessThreadPoolService(params.getMaxThreads(), blocking, params.getKeepAliveTime());

ThreadPoolManagementUtils.installThreadPoolService(service, params.getName(), serviceNameBase,
params.getThreadFactory(), threadFactoryResolver, service.getThreadFactoryInjector(),
ThreadPoolManagementUtils.installThreadPoolService(service, params.getName(), capability, context.getCurrentAddress(),
serviceNameBase, params.getThreadFactory(), threadFactoryResolver, service.getThreadFactoryInjector(),
params.getHandoffExecutor(), handoffExecutorResolver, blocking ? null : service.getHandoffExecutorInjector(),
context.getServiceTarget());
}
Expand Down
Loading

0 comments on commit 5ff07e4

Please sign in to comment.