1
0
mirror of https://gitee.com/dromara/hutool.git synced 2025-04-05 17:37:59 +08:00

add methods

This commit is contained in:
Looly 2022-03-31 21:30:32 +08:00
parent e187d1e168
commit 3987f5b7ee
7 changed files with 129 additions and 153 deletions

View File

@ -15,6 +15,7 @@
* 【core 】 增加NodeListIter、ResettableIter
* 【crypto 】 HmacAlgorithm增加SM4CMACissue#2206@Github
* 【http 】 增加HttpConfig响应支持拦截issue#2217@Github
* 【core 】 增加BlockPolicyThreadUtil增加newFixedExecutor方法pr#2231@Github
### 🐞Bug修复
* 【core 】 IdcardUtil#getCityCodeByIdCard位数问题issue#2224@Github

View File

@ -0,0 +1,28 @@
package cn.hutool.core.thread;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 当任务队列过长时处于阻塞状态直到添加到队列中
* 如果阻塞过程中被中断就会抛出{@link InterruptedException}异常<br>
* 有时候在线程池内访问第三方接口只希望固定并发数去访问并且不希望丢弃任务时使用此策略队列满的时候会处于阻塞状态(例如刷库的场景)
*
* @author luozongle
* @since 5.8.0
*/
public class BlockPolicy implements RejectedExecutionHandler {
public BlockPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put(r);
} catch (InterruptedException ex) {
throw new RejectedExecutionException("Task " + r + " rejected from " + e);
}
}
}

View File

@ -242,7 +242,7 @@ public class ExecutorBuilder implements Builder<ThreadPoolExecutor> {
workQueue = (corePoolSize <= 0) ? new SynchronousQueue<>() : new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
}
final ThreadFactory threadFactory = (null != builder.threadFactory) ? builder.threadFactory : Executors.defaultThreadFactory();
RejectedExecutionHandler handler = ObjectUtil.defaultIfNull(builder.handler, ThreadPoolExecutor.AbortPolicy::new);
RejectedExecutionHandler handler = ObjectUtil.defaultIfNull(builder.handler, RejectPolicy.ABORT.getValue());
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(//
corePoolSize, //

View File

@ -21,7 +21,9 @@ public enum RejectPolicy {
/** 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) */
DISCARD_OLDEST(new ThreadPoolExecutor.DiscardOldestPolicy()),
/** 由主线程来直接执行 */
CALLER_RUNS(new ThreadPoolExecutor.CallerRunsPolicy());
CALLER_RUNS(new ThreadPoolExecutor.CallerRunsPolicy()),
/** 当任务队列过长时处于阻塞状态,直到添加到队列中,固定并发数去访问,并且不希望丢弃任务时使用此策略 */
BLOCK(new BlockPolicy());
private final RejectedExecutionHandler value;

View File

@ -15,24 +15,34 @@ import java.util.concurrent.atomic.AtomicLong;
* @author looly
* @since 4.1.9
*/
public class ThreadFactoryBuilder implements Builder<ThreadFactory>{
public class ThreadFactoryBuilder implements Builder<ThreadFactory> {
private static final long serialVersionUID = 1L;
/** 用于线程创建的线程工厂类 */
/**
* 用于线程创建的线程工厂类
*/
private ThreadFactory backingThreadFactory;
/** 线程名的前缀 */
/**
* 线程名的前缀
*/
private String namePrefix;
/** 是否守护线程默认false */
/**
* 是否守护线程默认false
*/
private Boolean daemon;
/** 线程优先级 */
/**
* 线程优先级
*/
private Integer priority;
/** 未捕获异常处理器 */
/**
* 未捕获异常处理器
*/
private UncaughtExceptionHandler uncaughtExceptionHandler;
/**
* 创建{@link ThreadFactoryBuilder}
* 创建{@code ThreadFactoryBuilder}
*
* @return {@link ThreadFactoryBuilder}
* @return {@code ThreadFactoryBuilder}
*/
public static ThreadFactoryBuilder create() {
return new ThreadFactoryBuilder();
@ -115,7 +125,7 @@ public class ThreadFactoryBuilder implements Builder<ThreadFactory>{
/**
* 构建
*
* @param builder {@link ThreadFactoryBuilder}
* @param builder {@code ThreadFactoryBuilder}
* @return {@link ThreadFactory}
*/
private static ThreadFactory build(ThreadFactoryBuilder builder) {

View File

@ -1,7 +1,5 @@
package cn.hutool.core.thread;
import cn.hutool.core.thread.rejected.RejectedExecutionHandlerUtility;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
@ -43,114 +41,6 @@ public class ThreadUtil {
return builder.build();
}
/**
* 获取一个新的线程池默认的策略如下<br>
* <pre>
* 1. 核心线程数与最大线程数为nThreads指定的大小
* 2. 默认使用LinkedBlockingQueue默认队列大小为1024
* </pre>
*
* @param nThreads 线程池大小
* @param threadNamePrefix 线程名称前缀
* @return ExecutorService
*/
public static ExecutorService newFixedExecutor(int nThreads, String threadNamePrefix) {
ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build();
ExecutorBuilder builder = ExecutorBuilder.create().setThreadFactory(threadFactory);
if (nThreads > 0) {
builder.setCorePoolSize(nThreads)
.setMaxPoolSize(nThreads);
}
return builder.build();
}
/**
* 获取一个新的线程池默认的策略如下<br>
* <pre>
* 1. 核心线程数与最大线程数为nThreads指定的大小
* 2. 默认使用LinkedBlockingQueue默认队列大小为1024
* 3. 当执行拒绝策略的时候会处于阻塞状态直到能添加到队列中或者被{@link Thread#interrupt()}中断
* </pre>
*
* @param nThreads 线程池大小
* @param threadNamePrefix 线程名称前缀
* @return ExecutorService
*/
public static ExecutorService newFixedBlockedExecutor(int nThreads, String threadNamePrefix) {
ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build();
ExecutorBuilder builder = ExecutorBuilder.create()
.setCorePoolSize(nThreads)
.setMaxPoolSize(nThreads)
.setThreadFactory(threadFactory)
.setHandler(new RejectedExecutionHandlerUtility.BlockPolicy());
return builder.build();
}
/**
* 获取一个新的线程池默认的策略如下<br>
* <pre>
* 1. 核心线程数与最大线程数为nThreads指定的大小
* 2. 默认使用LinkedBlockingQueue
* </pre>
*
* @param nThreads 线程池大小
* @param maximumQueueSize 队列大小
* @param threadNamePrefix 线程名称前缀
* @return ExecutorService
*/
public static ExecutorService newFixedExecutor(int nThreads, int maximumQueueSize, String threadNamePrefix) {
return newFixedExecutor(nThreads, maximumQueueSize, threadNamePrefix, null);
}
/**
* 获取一个新的线程池默认的策略如下<br>
* <pre>
* 1. 核心线程数与最大线程数为nThreads指定的大小
* 2. 默认使用LinkedBlockingQueue
* 3. 当执行拒绝策略的时候会处于阻塞状态直到能添加到队列中或者被{@link Thread#interrupt()}中断
* </pre>
*
* @param nThreads 线程池大小
* @param threadNamePrefix 线程名称前缀
* @return ExecutorService
*/
public static ExecutorService newFixedBlockingExecutor(int nThreads, int maximumQueueSize, String threadNamePrefix) {
return newFixedExecutor(nThreads,
maximumQueueSize,
threadNamePrefix,
new RejectedExecutionHandlerUtility.BlockPolicy());
}
/**
* 获得一个新的线程池默认策略如下<br>
* <pre>
* 1. 核心线程数与最大线程数为nThreads指定的大小
* 2. 默认使用LinkedBlockingQueue
* </pre>
*
* @param nThreads 线程池大小
* @param maximumQueueSize 队列大小
* @param threadNamePrefix 线程名称前缀
* @param handler 拒绝策略
* @return ExecutorService
*/
public static ExecutorService newFixedExecutor(int nThreads,
int maximumQueueSize,
String threadNamePrefix,
RejectedExecutionHandler handler) {
ThreadFactory threadFactory = ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build();
ExecutorBuilder builder = ExecutorBuilder.create()
.setCorePoolSize(nThreads)
.setMaxPoolSize(nThreads)
.setWorkQueue(new LinkedBlockingQueue<>(maximumQueueSize))
.setThreadFactory(threadFactory)
.setHandler(handler);
return builder.build();
}
/**
* 获得一个新的线程池默认的策略如下
* <pre>
@ -240,6 +130,71 @@ public class ThreadUtil {
return ExecutorBuilder.create().setCorePoolSize(poolSize).setMaxPoolSize(poolSize).setKeepAliveTime(0L).build();
}
/**
* 获取一个新的线程池默认的策略如下<br>
* <pre>
* 1. 核心线程数与最大线程数为nThreads指定的大小
* 2. 默认使用LinkedBlockingQueue默认队列大小为1024
* 3. 如果isBlocked为{code true}当执行拒绝策略的时候会处于阻塞状态直到能添加到队列中或者被{@link Thread#interrupt()}中断
* </pre>
*
* @param nThreads 线程池大小
* @param threadNamePrefix 线程名称前缀
* @param isBlocked 是否使用{@link BlockPolicy}策略
* @return ExecutorService
* @author luozongle
* @since 5.8.0
*/
public static ExecutorService newFixedExecutor(int nThreads, String threadNamePrefix, boolean isBlocked) {
return newFixedExecutor(nThreads, 1024, threadNamePrefix, isBlocked);
}
/**
* 获取一个新的线程池默认的策略如下<br>
* <pre>
* 1. 核心线程数与最大线程数为nThreads指定的大小
* 2. 默认使用LinkedBlockingQueue
* 3. 当执行拒绝策略的时候会处于阻塞状态直到能添加到队列中或者被{@link Thread#interrupt()}中断
* </pre>
*
* @param nThreads 线程池大小
* @param threadNamePrefix 线程名称前缀
* @return ExecutorService
* @author luozongle
* @since 5.8.0
*/
public static ExecutorService newFixedExecutor(int nThreads, int maximumQueueSize, String threadNamePrefix, boolean isBlocked) {
return newFixedExecutor(nThreads, maximumQueueSize, threadNamePrefix,
(isBlocked ? RejectPolicy.BLOCK : RejectPolicy.ABORT).getValue());
}
/**
* 获得一个新的线程池默认策略如下<br>
* <pre>
* 1. 核心线程数与最大线程数为nThreads指定的大小
* 2. 默认使用LinkedBlockingQueue
* </pre>
*
* @param nThreads 线程池大小
* @param maximumQueueSize 队列大小
* @param threadNamePrefix 线程名称前缀
* @param handler 拒绝策略
* @return ExecutorService
* @author luozongle
* @since 5.8.0
*/
public static ExecutorService newFixedExecutor(int nThreads,
int maximumQueueSize,
String threadNamePrefix,
RejectedExecutionHandler handler) {
return ExecutorBuilder.create()
.setCorePoolSize(nThreads).setMaxPoolSize(nThreads)
.setWorkQueue(new LinkedBlockingQueue<>(maximumQueueSize))
.setThreadFactory(createThreadFactory(threadNamePrefix))
.setHandler(handler)
.build();
}
/**
* 直接在公共线程池中执行线程
*
@ -501,6 +456,18 @@ public class ThreadUtil {
return ThreadFactoryBuilder.create();
}
/**
* 创建自定义线程名称前缀的{@link ThreadFactory}
*
* @param threadNamePrefix 线程名称前缀
* @return {@link ThreadFactory}
* @see ThreadFactoryBuilder#build()
* @since 5.8.0
*/
public static ThreadFactory createThreadFactory(String threadNamePrefix) {
return ThreadFactoryBuilder.create().setNamePrefix(threadNamePrefix).build();
}
/**
* 结束线程调用此方法后线程将抛出 {@link InterruptedException}异常
*

View File

@ -1,32 +0,0 @@
package cn.hutool.core.thread.rejected;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池拒绝策略工具类
*
* @author luozongle
*/
public class RejectedExecutionHandlerUtility {
/**
* 当任务队列过长时处于阻塞状态直到添加到队列中
* 如果阻塞过程中被中断就会抛出{@link InterruptedException}异常
*/
public static class BlockPolicy implements RejectedExecutionHandler {
public BlockPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put(r);
} catch (InterruptedException ex) {
throw new RejectedExecutionException("Task " + r + " rejected from " + e);
}
}
}
}