public class RecyclableBatchThreadPoolExecutor extends Object
1.数据分批并行处理 2.主线程、线程池混合执行批处理任务,主线程空闲时会尝试召回线程池队列中的任务执行 3.线程安全,可用同时执行多个任务,线程池满载时,效率与单线程模式相当,无阻塞风险,无脑提交任务即可适用场景:
1.批量处理数据且需要同步结束的场景,能一定程度上提高吞吐量、防止任务堆积process(List, int, Function)
2.普通查询接口加速processByWarp(Warp[])
Modifier and Type | Class and Description |
---|---|
static class |
RecyclableBatchThreadPoolExecutor.Warp<R>
处理逻辑包装类
|
Constructor and Description |
---|
RecyclableBatchThreadPoolExecutor(ExecutorService executor)
自定义线程池,一般不需要使用
|
RecyclableBatchThreadPoolExecutor(int poolSize)
构造
|
RecyclableBatchThreadPoolExecutor(int poolSize,
String threadPoolPrefix)
建议的构造方法
1.使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略
2.假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化
|
Modifier and Type | Method and Description |
---|---|
ExecutorService |
getExecutor()
获取线程池
|
<T,R> List<R> |
process(List<T> data,
int batchSize,
Function<T,R> processor)
分批次处理数据
1.所有批次执行完成后会过滤null并返回合并结果,保持输入数据顺序,不需要结果
Function 返回null即可
2. |
List<RecyclableBatchThreadPoolExecutor.Warp<?>> |
processByWarp(List<RecyclableBatchThreadPoolExecutor.Warp<?>> warps)
处理Warp集合
|
List<RecyclableBatchThreadPoolExecutor.Warp<?>> |
processByWarp(RecyclableBatchThreadPoolExecutor.Warp<?>... warps)
处理Warp数组
Warp<String> warp1 = Warp.of(this::select1);
Warp<List<String>> warp2 = Warp.of(this::select2);
executor.processByWarp(warp1, warp2);
String r1 = warp1.get();
List<String> r2 = warp2.get();
|
void |
shutdown()
关闭线程池
|
public RecyclableBatchThreadPoolExecutor(int poolSize)
poolSize
- 线程池大小public RecyclableBatchThreadPoolExecutor(int poolSize, String threadPoolPrefix)
1.使用无界队列,主线程会召回队列中的任务执行,不会有任务堆积,无需考虑拒绝策略 2.假如在web场景中请求量过大导致oom,不使用此工具也会有同样的结果,甚至更严重,应该对请求做限制或做其他优化
poolSize
- 线程池大小threadPoolPrefix
- 线程名前缀public RecyclableBatchThreadPoolExecutor(ExecutorService executor)
executor
- 线程池public void shutdown()
public ExecutorService getExecutor()
public <T,R> List<R> process(List<T> data, int batchSize, Function<T,R> processor)
1.所有批次执行完成后会过滤null并返回合并结果,保持输入数据顺序,不需要结果Function
返回null即可 2.Function
需自行处理异常、保证线程安全 3.原始数据在分片后可能被外部修改,导致批次数据不一致,如有必要,传参之前进行数据拷贝 4.主线程会参与处理批次数据,如果要异步执行任务请使用普通线程池
T
- 输入数据类型R
- 输出数据类型data
- 待处理数据集合batchSize
- 每批次数据量processor
- 单条数据处理函数public List<RecyclableBatchThreadPoolExecutor.Warp<?>> processByWarp(RecyclableBatchThreadPoolExecutor.Warp<?>... warps)
Warp<String> warp1 = Warp.of(this::select1);
Warp<List<String>> warp2 = Warp.of(this::select2);
executor.processByWarp(warp1, warp2);
String r1 = warp1.get();
List<String> r2 = warp2.get();
warps
- Warp数组public List<RecyclableBatchThreadPoolExecutor.Warp<?>> processByWarp(List<RecyclableBatchThreadPoolExecutor.Warp<?>> warps)
warps
- Warp集合Copyright © 2025. All rights reserved.