ElasticSearch 通用化批量数据同步器封装设计

一、设计需要满足的功能场景

ES 现在也是很多公司会用到的工具了,用来做数据的快速查询,那么不可避免的就会遇到需要将数据同步到 ES 的情况,为了做封装我们需要考虑到几个方面的问题。

  1. 数据量: 数据同步那么自然涉及到同步的数据量问题。通常的做法是把所有的数据查询到缓存中,然后再依次丢到 ES 里。但是假如数据量极大的情况就会把内存撑爆。因此设计上我们需要考虑分页分片同步,每次查个500条就同步500条,用时间换空间。
  2. 效率问题: 既然分页分片,每个分片任务是互不影响的,因此可以考虑用多线程来提高处理效率。
  3. 通用性: 因为需要考虑通用性,因此分页接口需要调用方传入,作为规范约定,分页返回的数据必须与ES字段一致

二、约定

规范约定,分页返回的数据必须与ES字段一致,目的是为了保证数据的正确性

至于存储工具的封装,参考:ElasticSearch 通用化批量存储封装设计

三、问题

1.如何知道分页是否全部处理完成了?

这里采用一种假设法,假设我们一页500条数据,每批5个线程处理,那么我们每批的线程共会处理线程 500*5 = 2500 条数据,假如有 10001 条数据要处理,那么之前的 4 批,每次都会处理满 2500 条,仅最后一次只处理 1 条数据。因此当我们发现处理数据的数量无法达到最大值时,说明数据处理完。

2.我们如何做到每批 5 个线程,每个线程处理 500 条这种操作?并且还要统计每批处理的数据量?

我们可以使用 Fork/Join 框架来实现,Fork/Join 中的每个任务应该为互不干扰,互不影响的任务,因而以 5*500 为一组的任务进入 Fork/Join 中进行处理时,每 500 为一个线程的任务就是互补影响的,最后只需统计处理数量。

四、功能封装

1、接口函数定义

分页接口函数: 用来传分页查询接口,PageResultDTO 自定义的分页返回值,Pageable 分页参数

@FunctionalInterface
public interface PageFunction {
    PageResultDTO page(Pageable pageable);
}

ES操作接口函数: ES 操作接口函数

@FunctionalInterface
public interface EsOperationFunction<T> {
    void handle(List<T> data);
}

2、常量定义

/**
 * @author Jinpeng Lin
 * @description 全量分片常量
 * @date 2022-05-30
 */
public class PageShardingConstants {
    /**
     * ForkJoinPool 线程池最大数据
     */
    public static final int MAX_FORK_JOIN_POOL_SIZE = 10;

    /**
     * 失败重试次数
     */
    public static final int ERROR_TIMES = 5;

    /**
     * 最小分片数量
     */
    public static final int MIN_SHARDING_NUM = 1;

    /**
     * 最小分片大小,每个分片 N 条
     */
    public static final int MIN_SHARDING_SIZE = 1;

    /**
     * 最大分片数量
     */
    public static final int MAX_SHARDING_NUM = 10;

    /**
     * 最大分片大小,每个分片 N 条
     */
    public static final int MAX_SHARDING_SIZE = 1000;

    /**
     * 默认分片数量
     */
    public static final int DEFAULT_SHARDING_NUM = 4;

    /**
     * 默认分片大小,每个分片 N 条
     */
    public static final int DEFAULT_SHARDING_SIZE = 500;
}

3、多线程分片处理器(Fork/Join 框架任务切分与处理)

我们用 Fork/Join 框架来作为分片处理器,Fork/Join 框架的好处在于在多线程处理中,可以帮我们实工作密取,空闲的线程会从繁忙线程的任务队列中,从任务队列后面获取一个任务去执行,以保证线程的利用率。

处理器的返回值为处理的数据总条数

cutTask: 是否已经分片标识,任务拆分后,此参数为 true,表示无需再拆分
page: 当前分片任务对应分页
size: 当前分片处理条数,即每页的条数
remainShardNum: 剩余分片数量,用于记录剩余还有多少分片要处理
pageFunction: 分页接口函数
esOperationFunction: ES操作函接口函数

/**
 * @author Jinpeng Lin
 * @description 分页处理组件
 * @date 2022-05-30
 */
public class PageShardingHandle extends RecursiveTask<Integer> {

    private static final long serialVersionUID = 416664792640153410L;

    /**
     * 是否已经分片
     */
    private boolean cutTask;

    /**
     * 页码
     */
    private Integer page;

    /**
     * 每页条数
     */
    private Integer size;

    /**
     * 剩余分片数量
     */
    private Integer remainShardNum;

    /**
     * 分页接口函数
     */
    private PageFunction pageFunction;

    /**
     * ES操作接口函数
     */
    private EsOperationFunction esOperationFunction;


    public PageShardingHandle(Integer page, Integer size, Integer remainShardNum, PageFunction pageFunction,
        EsOperationFunction esOperationFunction) {
        this.cutTask = Boolean.FALSE;
        this.page = page;
        this.size = size;
        this.remainShardNum = remainShardNum;
        this.pageFunction = pageFunction;
        this.esOperationFunction = esOperationFunction;
    }

    public PageShardingHandle(boolean cutTask, Integer page, Integer size, Integer remainShardNum,
        PageFunction pageFunction, EsOperationFunction esOperationFunction) {
        this.page = page;
        this.size = size;
        this.remainShardNum = remainShardNum;
        this.cutTask = cutTask;
        this.pageFunction = pageFunction;
        this.esOperationFunction = esOperationFunction;
    }

    @Override
    protected Integer compute() {
        // 总条数
        Integer total = 0;
        // 已经任务进行分片,处理任务
        if (this.cutTask) {
            // 失败重试次数
            int errorNum = PageShardingConstants.ERROR_TIMES;
            // 失败原因
            Exception exception = null;
            // 分页对象
            PageResultDTO result = null;
            while (errorNum > 0) {
                try {
                    // 查询分页的数据
                    result = this.pageFunction.page(BasePageable.of(this.page, this.size));
                    // ES处理
                    if (result.getResult().size() > 0) {
                        this.esOperationFunction.handle(result.getResult());
                    }
                    total = result.getResult().size();
                    break;
                } catch (Exception e) {
                    errorNum--;
                    log.warn("ES剩余本地重试次数:{}次\n失败数据:{}\n失败异常:{}", errorNum,
                        result != null ? JSONUtil.toJsonStr(result) : "null", e.getMessage());
                    // 记录失败原因
                    if (errorNum == 0) {
                        exception = e;
                    }
                }
            }
            // 依然存在异常,则抛出异常
            if (exception != null) {
                throw new RuntimeException(exception);
            }
        } else {
            // 未分片,需要进行分片
            List<PageShardingHandle> handles = new ArrayList<>();
            while (this.remainShardNum > 0) {
                // page:传递下一页
                // remainShardNum:剩余分片数量减少
                // cutTask:任务为已分片任务
                handles.add(new PageShardingHandle(Boolean.TRUE, this.page++, this.size, --this.remainShardNum,
                    this.pageFunction, this.esOperationFunction));
            }
            // 提交分片任务
            invokeAll(handles);
            // 接收处理结果返回值,结果中汇总总条数
            for (PageShardingHandle handle : handles) {
                total += handle.join();
            }
        }
        return total;
    }
}

4、分页分片接口

/**
 * 分页分片封装
 */
public interface PageShardingApi {
    /**
     * 同步ES(已知总条数的情况下,自动进行分片计算)
     * @param total 总条数
     * @param pageFunction 分页查询操作
     * @param esIndexEnum ES对应索引
     */
    void syncEsByTotal(Integer total, PageFunction pageFunction, EsIndexEnum esIndexEnum);

    /**
     * 同步ES(已知总条数的情况下,自动进行分片计算)
     * @param size 每页条数
     * @param total 总条数
     * @param pageFunction 分页查询操作
     * @param esIndexEnum ES对应索引
     */
    void syncEsByTotal(Integer size, Integer total, PageFunction pageFunction, EsIndexEnum esIndexEnum);

    /**
     * 全量分片同步ES
     * @param pageFunction 分页查询操作
     * @param esIndexEnum ES对应索引
     */
    void syncEs(PageFunction pageFunction, EsIndexEnum esIndexEnum);

    /**
     * 全量分片同步ES
     * @param size 每页条数
     * @param pageFunction 分页查询操作
     * @param esIndexEnum ES对应索引
     */
    void syncEs(Integer size, PageFunction pageFunction, EsIndexEnum esIndexEnum);

    /**
     * 全量分片同步ES
     * @param size 每页条数
     * @param shardNum 分片数量
     * @param pageFunction 分页查询操作
     * @param esIndexEnum ES对应索引
     */
    void syncEs(Integer size, Integer shardNum, PageFunction pageFunction, EsIndexEnum esIndexEnum);
}

5、分页分片实现

EsIndexEnumEsOperationApi 参考:ElasticSearch 通用化批量存储封装设计

/**
 * @author Jinpeng Lin
 * @description 分页分片封装
 * @date 2022-05-30
 */
@Service
public class PageShardingApiImpl implements PageShardingApi {

    @Resource
    private EsOperationApi esOperationApi;

	/**
     * Redis 工具类,换成自己的
     */
	@Resource
    private RedisLockUtils redisLockUtils;

    /**
     * ForkJoinPool 线程池池子
     */
    private static final List<ForkJoinPool> forkJoinPools =
        new ArrayList<>(PageShardingConstants.MAX_FORK_JOIN_POOL_SIZE);

    @Override
    public void syncEsByTotal(Integer total, PageFunction pageFunction, EsIndexEnum esIndexEnum) {
        this.syncEsByTotal(null, total, pageFunction, esIndexEnum);
    }

    @Override
    public void syncEsByTotal(Integer size, Integer total, PageFunction pageFunction, EsIndexEnum esIndexEnum) {
        // 如果发现没有总条数或者总条数为 0,不进行后续操作
        if (total == null || total == 0) {
            return;
        }
        if (size == null) {
            size = PageShardingConstants.DEFAULT_SHARDING_SIZE;
        }
        Integer shardNum = total / size + (total % size == 0 ? 0 : 1);
        this.syncEs(size, shardNum, pageFunction, esIndexEnum);
    }

    @Override
    public void syncEs(PageFunction pageFunction, EsIndexEnum esIndexEnum) {
        this.syncEs(null, null, pageFunction, esIndexEnum);
    }

    @Override
    public void syncEs(Integer size, PageFunction pageFunction, EsIndexEnum esIndexEnum) {
        this.syncEs(size, null, pageFunction, esIndexEnum);
    }

    @Override
    public void syncEs(Integer size, Integer shardNum, PageFunction pageFunction, EsIndexEnum esIndexEnum) {
        // 分片的数量不能大于最大数量
        if (shardNum == null) {
            shardNum = PageShardingConstants.DEFAULT_SHARDING_NUM;
        }
        if (size == null) {
            size = PageShardingConstants.DEFAULT_SHARDING_SIZE;
        }
        if (shardNum > PageShardingConstants.MAX_SHARDING_NUM) {
            shardNum = PageShardingConstants.MAX_SHARDING_NUM;
        }
        if (size > PageShardingConstants.MAX_SHARDING_SIZE) {
            size = PageShardingConstants.MAX_SHARDING_SIZE;
        }
        // 每次分片的最大查询到的数量:分片数量 * 每个分片的条数
        Integer maxPageSize = size * shardNum;
        
		// 判断池子是否满了
        ForkJoinPool forkJoinPool = null;
        try {
            // 池子上锁
            forkJoinPoolLock.lock();
            // 构建线程池
            try {
                // 池子已经满了,等待
                while (forkJoinPools.size() >= PageShardingConstants.MAX_FORK_JOIN_POOL_SIZE) {
                    // 池子满了,睡眠1s
                    Thread.sleep(1000);
                }
                forkJoinPool = new ForkJoinPool(shardNum);
                forkJoinPools.add(forkJoinPool);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                forkJoinPoolLock.unlock();
            }
            // 线程池中添加任务
            ForkJoinTask<Integer> futureTask;
            for (int i = 1;; i++) {
                futureTask = forkJoinPool.submit(new PageShardingHandle(i, size, shardNum, pageFunction,
                    (data) -> this.esOperationApi.save(data, esIndexEnum)));
                // 如果此处分页处理的数据没有达到最大数量,说明分页见底
                if (futureTask.join() < maxPageSize) {
                    break;
                }
            }
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
                forkJoinPools.remove(forkJoinPool);
            }
        }
    }
}