public class RecyclableBatchThreadPoolExecutor extends Object
1.数据分批并行处理 2.主线程、线程池混合执行批处理任务,主线程空闲时会尝试召回线程池队列中的任务执行 3.线程安全,可用同时执行多个任务,线程池满载时,效率与单线程模式相当,无阻塞风险,无脑提交任务即可适用场景:
1.批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积process(List, int, Function)2.普通查询接口加速processByWarp(Warp[])
| Modifier and Type | Class and Description | 
|---|---|
| static class  | RecyclableBatchThreadPoolExecutor.Warp<R>处理逻辑包装类 | 
| Constructor and Description | 
|---|
| RecyclableBatchThreadPoolExecutor(ExecutorService executor)自定义线程池,一般不需要使用 | 
| RecyclableBatchThreadPoolExecutor(int poolSize)构造 | 
| RecyclableBatchThreadPoolExecutor(int poolSize,
                                 String threadPoolPrefix)建议的构造方法
 
 1.使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略
 2.假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化
  | 
| Modifier and Type | Method and Description | 
|---|---|
| ExecutorService | getExecutor()获取线程池 | 
| <T,R> List<R> | process(List<T> data,
       int batchSize,
       Function<T,R> processor)分批次处理数据
 
 1.所有批次执行完成后会过滤null并返回合并结果,保持输入数据顺序,不需要结果 Function返回null即可
 2. | 
| List<RecyclableBatchThreadPoolExecutor.Warp<?>> | processByWarp(List<RecyclableBatchThreadPoolExecutor.Warp<?>> warps)处理Warp集合 | 
| List<RecyclableBatchThreadPoolExecutor.Warp<?>> | processByWarp(RecyclableBatchThreadPoolExecutor.Warp<?>... warps)处理Warp数组
  
 Warp<String> warp1 = Warp.of(this::select1);
 Warp<List<String>> warp2 = Warp.of(this::select2);
 executor.processByWarp(warp1, warp2);
 String r1 = warp1.get();
 List<String> r2 = warp2.get();
  | 
| void | shutdown()关闭线程池 | 
public RecyclableBatchThreadPoolExecutor(int poolSize)
poolSize - 线程池大小public RecyclableBatchThreadPoolExecutor(int poolSize,
                                         String threadPoolPrefix)
1.使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略 2.假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化
poolSize - 线程池大小threadPoolPrefix - 线程名前缀public RecyclableBatchThreadPoolExecutor(ExecutorService executor)
executor - 线程池public void shutdown()
public ExecutorService getExecutor()
public <T,R> List<R> process(List<T> data, int batchSize, Function<T,R> processor)
1.所有批次执行完成后会过滤null并返回合并结果,保持输入数据顺序,不需要结果Function返回null即可 2.Function需自行处理异常、保证线程安全 3.原始数据在分片后可能被外部修改,导致批次数据不一致,如有必要,传参之前进行数据拷贝 4.主线程会参与处理批次数据,如果要异步执行任务请使用普通线程池
T - 输入数据类型R - 输出数据类型data - 待处理数据集合batchSize - 每批次数据量processor - 单条数据处理函数public List<RecyclableBatchThreadPoolExecutor.Warp<?>> processByWarp(RecyclableBatchThreadPoolExecutor.Warp<?>... warps)
 Warp<String> warp1 = Warp.of(this::select1);
 Warp<List<String>> warp2 = Warp.of(this::select2);
 executor.processByWarp(warp1, warp2);
 String r1 = warp1.get();
 List<String> r2 = warp2.get();
 warps - Warp数组public List<RecyclableBatchThreadPoolExecutor.Warp<?>> processByWarp(List<RecyclableBatchThreadPoolExecutor.Warp<?>> warps)
warps - Warp集合Copyright © 2025. All rights reserved.