flink 整合mybatis 并进行单个/批量sink,(不需要mybatis-config.xml!!!)

目录

1.依赖

2.编写sink继承 RichSinkFunction

3.MyDataSource 

 4.BookMapper

5.BookMapper.xml

6.使用


 之前看到过其他的关于flink整合mybatis的博客,比如flink 流式处理中如何集成mybatis框架,都用到了一个叫做"mybatis-config.xml" 的全局配置文件,但是我发现这种方式用在flink里面不是很灵活,后来我通过分析mybatis的源码,通过代码将"mybatis-config.xml"给替换掉,这样flink就可以动态加载mapper文件,每个sink可以只是加载和自己相关的mapper,从而减少加载不必要的mapper,使用更灵活。

1.依赖

 <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>druid</artifactId>
   <version>1.1.21</version>
 </dependency>
 <dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis</artifactId>
    <version>3.5.2</version>
 </dependency>
 <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.11</version>
 </dependency>

2.编写sink继承 RichSinkFunction

Slf4j
public class BookSink extends RichSinkFunction<List<Book>> {
    /**
     * 数据库连接配置
     */
    private MyDataSource myDataSource;
    /**
     * sqlSessionFactory
     */
    private SqlSessionFactory sqlSessionFactory;

    /***
     * @description : 构造传入数据源配置
     * @param myDataSource : 数据源配置
     */
    public BookSink(MyDataSource myDataSource) {
        this.myDataSource = myDataSource;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //添加需要操作的mapper
        Set<String> mapperXmls = new HashSet<>();
        mapperXmls.add("mapper/bookMapper.xml");

        //组装dataSource
        if (myDataSource == null) {
            throw new RuntimeException("数据库连接配置参数不能为空");
        }
        if (StringUtils.isBlank(myDataSource.getUrl())) {
            throw new RuntimeException("url is null");
        }
        if (StringUtils.isBlank(myDataSource.getDriver())) {
            throw new RuntimeException("driver is null");
        }
        if (StringUtils.isBlank(myDataSource.getUserName())) {
            throw new RuntimeException("userName is null");
        }
        if (StringUtils.isBlank(myDataSource.getPassword())) {
            throw new RuntimeException("password is null");
        }

        DruidDataSource dataSource = new DruidDataSource();
        //设置连接参数
        dataSource.setUrl(myDataSource.getUrl());
        dataSource.setDriverClassName(myDataSource.getDriver());
        dataSource.setUsername(myDataSource.getUserName());
        dataSource.setPassword(myDataSource.getPassword());

        //配置初始化大小、最小、最大
        dataSource.setInitialSize(myDataSource.getInitialSize());
        dataSource.setMinIdle(myDataSource.getMinIdle());
        dataSource.setMaxActive(myDataSource.getMaxActive());

        dataSource.setRemoveAbandoned(false);
        //超时时间;单位为秒。180秒=3分钟
        dataSource.setRemoveAbandonedTimeout(180);

        //配置一个连接在池中最小生存的时间,单位是毫秒
        dataSource.setMinEvictableIdleTimeMillis(300000);
        //配置获取连接等待超时的时间单位毫秒
        dataSource.setMaxWait(60000);
        //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
        dataSource.setTimeBetweenEvictionRunsMillis(60000);
        //防止过期
        dataSource.setValidationQuery("SELECT '1'");
        dataSource.setTestWhileIdle(true);
        // 每次获取连接时测试连接是否有效
        dataSource.setTestOnBorrow(true);
        dataSource.setTestOnReturn(true);

        //是否缓存preparedStatement
        dataSource.setPoolPreparedStatements(false);
        dataSource.setMaxOpenPreparedStatements(100);
        //asyncInit是1.1.4中新增加的配置,如果有initialSize数量较多时,打开会加快应用启动时间
        dataSource.setAsyncInit(true);
        dataSource.setName(myDataSource.getUserName());

        //获取sqlSessionFactory
        TransactionFactory transactionFactory = new JdbcTransactionFactory();
        Environment environment = new Environment("test-data", transactionFactory, dataSource);
        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
        configuration.setCacheEnabled(false);
        configuration.setUseGeneratedKeys(true);
        for (String mapperXml : mapperXmls) {
            ErrorContext.instance().resource(mapperXml);
            InputStream inputStream = Resources.getResourceAsStream(mapperXml);
            XMLMapperBuilder mapperParser = new XMLMapperBuilder(inputStream, configuration, mapperXml, configuration.getSqlFragments());
            mapperParser.parse();
        }
        sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
    }

    @Override
    public void invoke(List<Book> values, Context context) throws Exception {
        SqlSession sqlSession = null;
        try {
            //获取sqlSession
            sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, true);
            sqlSession.getMapper(BookMapper.class).batchInsert(values);
        } catch (Exception e) {
            log.error(this.getClass().getName() + ".invoke()执行失败", e);
            //重试一次
            try {
                if (sqlSession != null) {
                    sqlSession.getMapper(BookMapper.class).batchInsert(values);
                }
            } catch (Exception ex) {
                log.error(this.getClass().getName() + ".invoke()重试执行失败", e);
            }
        } finally {
            //关闭sqlSession
            if (sqlSession != null) {
                sqlSession.close();
            }
        }
    }
}

3.MyDataSource 

@Data
public class MyDataSource implements Serializable {
    /**
     * 数据源地址
     */
    private String url;
    /**
     * 数据源驱动
     */
    private String driver;
    /**
     * 用户名
     */
    private String userName;
    /**
     * 密码
     */
    private String password;
    /**
     * 连接池最大活跃数
     */
    private Integer maxActive;
    /**
     * 连接池初始化数量
     */
    private Integer initialSize;
    /**
     * 连接池最小数据
     */
    private Integer minIdle;
}

 4.BookMapper

/**
 * 提供单个插入和批量插入的方法
 * @author denglin
 * @since 2020-12-16
 */
public interface BookMapper extends Serializable {
    /***
     * @description : 插入单个记录到数据库
     * @param entity : 请求参数
     */
    void insert(Book entity);

    /***
     * @description : 批量插入记录到数据库
     * @param entities : 请求参数
     */
    void batchInsert(List<Book> entities);
}

5.BookMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.test.mapper.BookMapper">
    <!--插入单个记录-->
    <select id="insert" parameterType="com.test.Book">
        INSERT INTO book
        (
            name
            ,price
            ,created
            ,modified
        )
        VALUES
        (
             #{name}
            ,#{price}
            ,now()
            ,now()
        )
        ON DUPLICATE KEY UPDATE
            modified = now()

    </select>
    <!--批量插入-->
    <select id="batchInsert" parameterType="com.test.Book">
        INSERT INTO os_top_rank
        (
        name
        ,price
        ,created
        ,modified
        )
        VALUES
        <foreach collection="list" item="item" separator=",">
            (
            #{name}
            ,#{price}
            ,now()
            ,now()
            )
        </foreach>
        ON DUPLICATE KEY UPDATE
        modified = now()
    </select>
</mapper>

6.使用

//此处使用伪代码
MyDataSource myDataSource = new MyDataSource();
myDataSource.setUrl(...);
myDataSource.setDriver(...);
...;

xxxStream
.keyBy(...)
.process(...)
.addSink(new BookSink(myDataSource));