ElasticSearch 通用化批量数据同步器封装设计
ElasticSearch 通用化批量数据同步器封装设计
一、设计需要满足的功能场景
ES 现在也是很多公司会用到的工具了,用来做数据的快速查询,那么不可避免的就会遇到需要将数据同步到 ES 的情况,为了做封装我们需要考虑到几个方面的问题。
- 数据量: 数据同步那么自然涉及到同步的数据量问题。通常的做法是把所有的数据查询到缓存中,然后再依次丢到 ES 里。但是假如数据量极大的情况就会把内存撑爆。因此设计上我们需要考虑分页分片同步,每次查个500条就同步500条,用时间换空间。
- 效率问题: 既然分页分片,每个分片任务是互不影响的,因此可以考虑用多线程来提高处理效率。
- 通用性: 因为需要考虑通用性,因此分页接口需要调用方传入,作为规范约定,分页返回的数据必须与ES字段一致
二、约定
规范约定,分页返回的数据必须与ES字段一致,目的是为了保证数据的正确性
至于存储工具的封装,参考:ElasticSearch 通用化批量存储封装设计
三、问题
1.如何知道分页是否全部处理完成了?
这里采用一种假设法,假设我们一页500条数据,每批5个线程处理,那么我们每批的线程共会处理线程 500*5 = 2500 条数据,假如有 10001 条数据要处理,那么之前的 4 批,每次都会处理满 2500 条,仅最后一次只处理 1 条数据。因此当我们发现处理数据的数量无法达到最大值时,说明数据处理完。
2.我们如何做到每批 5 个线程,每个线程处理 500 条这种操作?并且还要统计每批处理的数据量?
我们可以使用 Fork/Join 框架来实现,Fork/Join 中的每个任务应该为互不干扰,互不影响的任务,因而以 5*500 为一组的任务进入 Fork/Join 中进行处理时,每 500 为一个线程的任务就是互补影响的,最后只需统计处理数量。
四、功能封装
1、接口函数定义
分页接口函数: 用来传分页查询接口,PageResultDTO 自定义的分页返回值,Pageable 分页参数
@FunctionalInterface
public interface PageFunction {
PageResultDTO page(Pageable pageable);
}
ES操作接口函数: ES 操作接口函数
@FunctionalInterface
public interface EsOperationFunction<T> {
void handle(List<T> data);
}
2、常量定义
/**
* @author Jinpeng Lin
* @description 全量分片常量
* @date 2022-05-30
*/
public class PageShardingConstants {
/**
* ForkJoinPool 线程池最大数据
*/
public static final int MAX_FORK_JOIN_POOL_SIZE = 10;
/**
* 失败重试次数
*/
public static final int ERROR_TIMES = 5;
/**
* 最小分片数量
*/
public static final int MIN_SHARDING_NUM = 1;
/**
* 最小分片大小,每个分片 N 条
*/
public static final int MIN_SHARDING_SIZE = 1;
/**
* 最大分片数量
*/
public static final int MAX_SHARDING_NUM = 10;
/**
* 最大分片大小,每个分片 N 条
*/
public static final int MAX_SHARDING_SIZE = 1000;
/**
* 默认分片数量
*/
public static final int DEFAULT_SHARDING_NUM = 4;
/**
* 默认分片大小,每个分片 N 条
*/
public static final int DEFAULT_SHARDING_SIZE = 500;
}
3、多线程分片处理器(Fork/Join 框架任务切分与处理)
我们用 Fork/Join 框架来作为分片处理器,Fork/Join 框架的好处在于在多线程处理中,可以帮我们实工作密取,空闲的线程会从繁忙线程的任务队列中,从任务队列后面获取一个任务去执行,以保证线程的利用率。
处理器的返回值为处理的数据总条数
● cutTask: 是否已经分片标识,任务拆分后,此参数为 true,表示无需再拆分
● page: 当前分片任务对应分页
● size: 当前分片处理条数,即每页的条数
● remainShardNum: 剩余分片数量,用于记录剩余还有多少分片要处理
● pageFunction: 分页接口函数
● esOperationFunction: ES操作函接口函数
/**
* @author Jinpeng Lin
* @description 分页处理组件
* @date 2022-05-30
*/
public class PageShardingHandle extends RecursiveTask<Integer> {
private static final long serialVersionUID = 416664792640153410L;
/**
* 是否已经分片
*/
private boolean cutTask;
/**
* 页码
*/
private Integer page;
/**
* 每页条数
*/
private Integer size;
/**
* 剩余分片数量
*/
private Integer remainShardNum;
/**
* 分页接口函数
*/
private PageFunction pageFunction;
/**
* ES操作接口函数
*/
private EsOperationFunction esOperationFunction;
public PageShardingHandle(Integer page, Integer size, Integer remainShardNum, PageFunction pageFunction,
EsOperationFunction esOperationFunction) {
this.cutTask = Boolean.FALSE;
this.page = page;
this.size = size;
this.remainShardNum = remainShardNum;
this.pageFunction = pageFunction;
this.esOperationFunction = esOperationFunction;
}
public PageShardingHandle(boolean cutTask, Integer page, Integer size, Integer remainShardNum,
PageFunction pageFunction, EsOperationFunction esOperationFunction) {
this.page = page;
this.size = size;
this.remainShardNum = remainShardNum;
this.cutTask = cutTask;
this.pageFunction = pageFunction;
this.esOperationFunction = esOperationFunction;
}
@Override
protected Integer compute() {
// 总条数
Integer total = 0;
// 已经任务进行分片,处理任务
if (this.cutTask) {
// 失败重试次数
int errorNum = PageShardingConstants.ERROR_TIMES;
// 失败原因
Exception exception = null;
// 分页对象
PageResultDTO result = null;
while (errorNum > 0) {
try {
// 查询分页的数据
result = this.pageFunction.page(BasePageable.of(this.page, this.size));
// ES处理
if (result.getResult().size() > 0) {
this.esOperationFunction.handle(result.getResult());
}
total = result.getResult().size();
break;
} catch (Exception e) {
errorNum--;
log.warn("ES剩余本地重试次数:{}次\n失败数据:{}\n失败异常:{}", errorNum,
result != null ? JSONUtil.toJsonStr(result) : "null", e.getMessage());
// 记录失败原因
if (errorNum == 0) {
exception = e;
}
}
}
// 依然存在异常,则抛出异常
if (exception != null) {
throw new RuntimeException(exception);
}
} else {
// 未分片,需要进行分片
List<PageShardingHandle> handles = new ArrayList<>();
while (this.remainShardNum > 0) {
// page:传递下一页
// remainShardNum:剩余分片数量减少
// cutTask:任务为已分片任务
handles.add(new PageShardingHandle(Boolean.TRUE, this.page++, this.size, --this.remainShardNum,
this.pageFunction, this.esOperationFunction));
}
// 提交分片任务
invokeAll(handles);
// 接收处理结果返回值,结果中汇总总条数
for (PageShardingHandle handle : handles) {
total += handle.join();
}
}
return total;
}
}
4、分页分片接口
/**
* 分页分片封装
*/
public interface PageShardingApi {
/**
* 同步ES(已知总条数的情况下,自动进行分片计算)
* @param total 总条数
* @param pageFunction 分页查询操作
* @param esIndexEnum ES对应索引
*/
void syncEsByTotal(Integer total, PageFunction pageFunction, EsIndexEnum esIndexEnum);
/**
* 同步ES(已知总条数的情况下,自动进行分片计算)
* @param size 每页条数
* @param total 总条数
* @param pageFunction 分页查询操作
* @param esIndexEnum ES对应索引
*/
void syncEsByTotal(Integer size, Integer total, PageFunction pageFunction, EsIndexEnum esIndexEnum);
/**
* 全量分片同步ES
* @param pageFunction 分页查询操作
* @param esIndexEnum ES对应索引
*/
void syncEs(PageFunction pageFunction, EsIndexEnum esIndexEnum);
/**
* 全量分片同步ES
* @param size 每页条数
* @param pageFunction 分页查询操作
* @param esIndexEnum ES对应索引
*/
void syncEs(Integer size, PageFunction pageFunction, EsIndexEnum esIndexEnum);
/**
* 全量分片同步ES
* @param size 每页条数
* @param shardNum 分片数量
* @param pageFunction 分页查询操作
* @param esIndexEnum ES对应索引
*/
void syncEs(Integer size, Integer shardNum, PageFunction pageFunction, EsIndexEnum esIndexEnum);
}
5、分页分片实现
EsIndexEnum 与 EsOperationApi 参考:ElasticSearch 通用化批量存储封装设计
/**
* @author Jinpeng Lin
* @description 分页分片封装
* @date 2022-05-30
*/
@Service
public class PageShardingApiImpl implements PageShardingApi {
@Resource
private EsOperationApi esOperationApi;
/**
* Redis 工具类,换成自己的
*/
@Resource
private RedisLockUtils redisLockUtils;
/**
* ForkJoinPool 线程池池子
*/
private static final List<ForkJoinPool> forkJoinPools =
new ArrayList<>(PageShardingConstants.MAX_FORK_JOIN_POOL_SIZE);
@Override
public void syncEsByTotal(Integer total, PageFunction pageFunction, EsIndexEnum esIndexEnum) {
this.syncEsByTotal(null, total, pageFunction, esIndexEnum);
}
@Override
public void syncEsByTotal(Integer size, Integer total, PageFunction pageFunction, EsIndexEnum esIndexEnum) {
// 如果发现没有总条数或者总条数为 0,不进行后续操作
if (total == null || total == 0) {
return;
}
if (size == null) {
size = PageShardingConstants.DEFAULT_SHARDING_SIZE;
}
Integer shardNum = total / size + (total % size == 0 ? 0 : 1);
this.syncEs(size, shardNum, pageFunction, esIndexEnum);
}
@Override
public void syncEs(PageFunction pageFunction, EsIndexEnum esIndexEnum) {
this.syncEs(null, null, pageFunction, esIndexEnum);
}
@Override
public void syncEs(Integer size, PageFunction pageFunction, EsIndexEnum esIndexEnum) {
this.syncEs(size, null, pageFunction, esIndexEnum);
}
@Override
public void syncEs(Integer size, Integer shardNum, PageFunction pageFunction, EsIndexEnum esIndexEnum) {
// 分片的数量不能大于最大数量
if (shardNum == null) {
shardNum = PageShardingConstants.DEFAULT_SHARDING_NUM;
}
if (size == null) {
size = PageShardingConstants.DEFAULT_SHARDING_SIZE;
}
if (shardNum > PageShardingConstants.MAX_SHARDING_NUM) {
shardNum = PageShardingConstants.MAX_SHARDING_NUM;
}
if (size > PageShardingConstants.MAX_SHARDING_SIZE) {
size = PageShardingConstants.MAX_SHARDING_SIZE;
}
// 每次分片的最大查询到的数量:分片数量 * 每个分片的条数
Integer maxPageSize = size * shardNum;
// 判断池子是否满了
ForkJoinPool forkJoinPool = null;
try {
// 池子上锁
forkJoinPoolLock.lock();
// 构建线程池
try {
// 池子已经满了,等待
while (forkJoinPools.size() >= PageShardingConstants.MAX_FORK_JOIN_POOL_SIZE) {
// 池子满了,睡眠1s
Thread.sleep(1000);
}
forkJoinPool = new ForkJoinPool(shardNum);
forkJoinPools.add(forkJoinPool);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
forkJoinPoolLock.unlock();
}
// 线程池中添加任务
ForkJoinTask<Integer> futureTask;
for (int i = 1;; i++) {
futureTask = forkJoinPool.submit(new PageShardingHandle(i, size, shardNum, pageFunction,
(data) -> this.esOperationApi.save(data, esIndexEnum)));
// 如果此处分页处理的数据没有达到最大数量,说明分页见底
if (futureTask.join() < maxPageSize) {
break;
}
}
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
forkJoinPools.remove(forkJoinPool);
}
}
}
}