Skip to content

Commit

Permalink
Merge pull request #15 from uavorg/master
Browse files Browse the repository at this point in the history
merge master
  • Loading branch information
BeliefYou8 committed Aug 31, 2018
2 parents 55ef39a + 429ba46 commit bc696c9
Show file tree
Hide file tree
Showing 33 changed files with 731 additions and 549 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public Status process() {
job.put("reader", reader);
job.put("TailLogcomp", this);

executor.submitTask(job);
executor.submit(job);

job.waitAllTaskDone();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,11 @@ else if (compInfo.containsKey("anno")) {

Map<String, Object> annoWebService = (Map<String, Object>) compDesInfo.get("javax.jws.WebService");

String serviceName = (String) annoWebService.get("serviceName");
String serviceName = null;

if (null != annoWebService) {
serviceName = (String) annoWebService.get("serviceName");
}

if (StringHelper.isEmpty(serviceName)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,37 @@

package com.creditease.agent.spi;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;

public interface IForkjoinWorker {

public String getName();

public String getFeature();

public void submitTask(AbstractPartitionJob task);
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);

public <T> ForkJoinTask<T> submit(Callable<T> task);

public <T> ForkJoinTask<T> submit(Runnable task, T result);

public ForkJoinTask<?> submit(Runnable task);

public void shutdown();

public void getParallelism();
public int getParallelism();

public void getRunningThreadCount();
public int getRunningThreadCount();

public void getPoolSize();
public int getPoolSize();

public void getQueuedTaskCount();
public long getQueuedTaskCount();

@Override
public String toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,12 @@

package com.creditease.agent.spi;

import java.util.Date;
import java.util.Map;

public interface ITimerWorkManager {

public boolean scheduleWork(String workName, AbstractTimerWork r, long delay, long period);

public boolean scheduleWork(String workName, AbstractTimerWork r, Date firstDate, long period);

public boolean scheduleWorkInPeriod(String workName, AbstractTimerWork r, Date firstDate, long period);

public boolean scheduleWorkInPeriod(String workName, AbstractTimerWork r, long delay, long period);

public boolean cancel(String workName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@

package com.creditease.agent.workqueue;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;

import com.creditease.agent.spi.AbstractComponent;
import com.creditease.agent.spi.AbstractPartitionJob;
import com.creditease.agent.spi.IForkjoinWorker;

/**
Expand All @@ -42,13 +46,49 @@ public SystemForkjoinWorker(String name, String feature, int maxthreadCount) {
executor = new ForkJoinPool(maxthreadCount);
}

/*
* invokeAll方法
*/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {

return executor.invokeAll(tasks);
}

/*
* submit方法
*/
@Override
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {

return executor.submit(task);
}

/*
* submit方法
*/
@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {

return executor.submit(task);
}

/*
* submit方法
*/

@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {

return executor.submit(task, result);
}

/*
* submit方法
*/
@Override
public void submitTask(AbstractPartitionJob task) {
public ForkJoinTask<?> submit(Runnable task) {

executor.submit(task);
return executor.submit(task);
}

/*
Expand All @@ -66,36 +106,36 @@ public void shutdown() {
* parallelism level of this pool
*/
@Override
public void getParallelism() {
public int getParallelism() {

executor.getParallelism();
return executor.getParallelism();
}

/*
* Returns an estimate of the number of worker threads
*/
@Override
public void getRunningThreadCount() {
public int getRunningThreadCount() {

executor.getRunningThreadCount();
return executor.getRunningThreadCount();
}

/*
* Returns the number of worker threads
*/
@Override
public void getPoolSize() {
public int getPoolSize() {

executor.getPoolSize();
return executor.getPoolSize();
}

/*
* Returns the number of queued tasks
*/
@Override
public void getQueuedTaskCount() {
public long getQueuedTaskCount() {

executor.getQueuedTaskCount();
return executor.getQueuedTaskCount();
}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*-
* <<
* UAVStack
* ==
* Copyright (C) 2016 - 2017 UAVStack
* ==
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* >>
*/

package com.creditease.agent.workqueue;

import java.util.Collections;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.creditease.agent.spi.AbstractComponent;
import com.creditease.agent.spi.AbstractTimerWork;
import com.creditease.agent.spi.ITimerWorkManager;

public class SystemScheduledWorkMgr extends AbstractComponent implements ITimerWorkManager {

private final static int DEFAULT_POOL_SIZE = 2;
private ScheduledThreadPoolExecutor scheduledPool;

private Map<String, AbstractTimerWork> tasksMap = new ConcurrentHashMap<>();

/**
* Create a scheduled work manager with preset thread pool core size. The initial pool size here is 5.
*
* @param cName
* @param feature
*/
public SystemScheduledWorkMgr(String cName, String feature) {

this(cName, feature, DEFAULT_POOL_SIZE);
}

public SystemScheduledWorkMgr(String cName, String feature, int coreSize) {

super(cName, feature);
this.scheduledPool = new ScheduledThreadPoolExecutor(coreSize, new ThreadPoolExecutor.CallerRunsPolicy());
}

/**
* @return boolean, when submit a task already. submitted before, return false. else return true.
*
* @throws IllegalArgumentException
* when input parameters are null or period less than 0
*/
@Override
public boolean scheduleWork(String workName, AbstractTimerWork r, long delay, long period) {

if (workName == null || "".equals(workName) || r == null || period < 0) {
throw new IllegalArgumentException("Wrong args for submitting a timer task.");
}

// if task already submitted
if (tasksMap.containsKey(workName)) {
return false;
}

TimerTask timerTask = createTimerTask(workName, r);

this.scheduledPool.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
return true;
}

@Override
public boolean scheduleWorkInPeriod(String workName, AbstractTimerWork r, long delay, long period) {

if (workName == null || "".equals(workName) || r == null || period < 0) {
throw new IllegalArgumentException("Wrong args for submitting a timer task.");
}

if (tasksMap.containsKey(workName)) {
return false;
}

TimerTask timerTask = createTimerTask(workName, r);

this.scheduledPool.scheduleWithFixedDelay(timerTask, delay, period, TimeUnit.MILLISECONDS);
return true;
}

@Override
public boolean cancel(String workName) {

if (workName == null || "".equals(workName)) {
return false;
}

if (this.tasksMap.containsKey(workName)) {
AbstractTimerWork timerWork = tasksMap.get(workName);
timerWork.cancel();
this.tasksMap.remove(workName);
return true;
}
return false;
}

@Override
public void shutdown() {

if (this.tasksMap.size() > 0) {
for (AbstractTimerWork timerWork : tasksMap.values()) {
timerWork.cancel();
}
this.tasksMap.clear();
}
scheduledPool.shutdown();
}

@Override
public Map<String, AbstractTimerWork> getAllTimerWork() {

return Collections.unmodifiableMap(tasksMap);
}

private TimerTask createTimerTask(String workName, AbstractTimerWork r) {

final AbstractTimerWork task = r;
TimerTask timerTask = new TimerTask() {

@Override
public void run() {

try {
task.run();
}
catch (Throwable e) {
log.err(this, "Timer Worker[" + task.getName() + "] runs FAIL.", e);
}
}

};

r.setCurrentTimerTask(timerTask);

this.tasksMap.put(workName, r);

return timerTask;
}

}
Loading

0 comments on commit bc696c9

Please sign in to comment.