flink 整合mybatis 并进行单个/批量sink,(不需要mybatis-config.xml!!!)
目录
之前看到过其他的关于flink整合mybatis的博客,比如flink 流式处理中如何集成mybatis框架,都用到了一个叫做"mybatis-config.xml" 的全局配置文件,但是我发现这种方式用在flink里面不是很灵活,后来我通过分析mybatis的源码,通过代码将"mybatis-config.xml"给替换掉,这样flink就可以动态加载mapper文件,每个sink可以只是加载和自己相关的mapper,从而减少加载不必要的mapper,使用更灵活。
1.依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.21</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>
2.编写sink继承 RichSinkFunction
Slf4j
public class BookSink extends RichSinkFunction<List<Book>> {
/**
* 数据库连接配置
*/
private MyDataSource myDataSource;
/**
* sqlSessionFactory
*/
private SqlSessionFactory sqlSessionFactory;
/***
* @description : 构造传入数据源配置
* @param myDataSource : 数据源配置
*/
public BookSink(MyDataSource myDataSource) {
this.myDataSource = myDataSource;
}
@Override
public void open(Configuration parameters) throws Exception {
//添加需要操作的mapper
Set<String> mapperXmls = new HashSet<>();
mapperXmls.add("mapper/bookMapper.xml");
//组装dataSource
if (myDataSource == null) {
throw new RuntimeException("数据库连接配置参数不能为空");
}
if (StringUtils.isBlank(myDataSource.getUrl())) {
throw new RuntimeException("url is null");
}
if (StringUtils.isBlank(myDataSource.getDriver())) {
throw new RuntimeException("driver is null");
}
if (StringUtils.isBlank(myDataSource.getUserName())) {
throw new RuntimeException("userName is null");
}
if (StringUtils.isBlank(myDataSource.getPassword())) {
throw new RuntimeException("password is null");
}
DruidDataSource dataSource = new DruidDataSource();
//设置连接参数
dataSource.setUrl(myDataSource.getUrl());
dataSource.setDriverClassName(myDataSource.getDriver());
dataSource.setUsername(myDataSource.getUserName());
dataSource.setPassword(myDataSource.getPassword());
//配置初始化大小、最小、最大
dataSource.setInitialSize(myDataSource.getInitialSize());
dataSource.setMinIdle(myDataSource.getMinIdle());
dataSource.setMaxActive(myDataSource.getMaxActive());
dataSource.setRemoveAbandoned(false);
//超时时间;单位为秒。180秒=3分钟
dataSource.setRemoveAbandonedTimeout(180);
//配置一个连接在池中最小生存的时间,单位是毫秒
dataSource.setMinEvictableIdleTimeMillis(300000);
//配置获取连接等待超时的时间单位毫秒
dataSource.setMaxWait(60000);
//配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
dataSource.setTimeBetweenEvictionRunsMillis(60000);
//防止过期
dataSource.setValidationQuery("SELECT '1'");
dataSource.setTestWhileIdle(true);
// 每次获取连接时测试连接是否有效
dataSource.setTestOnBorrow(true);
dataSource.setTestOnReturn(true);
//是否缓存preparedStatement
dataSource.setPoolPreparedStatements(false);
dataSource.setMaxOpenPreparedStatements(100);
//asyncInit是1.1.4中新增加的配置,如果有initialSize数量较多时,打开会加快应用启动时间
dataSource.setAsyncInit(true);
dataSource.setName(myDataSource.getUserName());
//获取sqlSessionFactory
TransactionFactory transactionFactory = new JdbcTransactionFactory();
Environment environment = new Environment("test-data", transactionFactory, dataSource);
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(environment);
configuration.setCacheEnabled(false);
configuration.setUseGeneratedKeys(true);
for (String mapperXml : mapperXmls) {
ErrorContext.instance().resource(mapperXml);
InputStream inputStream = Resources.getResourceAsStream(mapperXml);
XMLMapperBuilder mapperParser = new XMLMapperBuilder(inputStream, configuration, mapperXml, configuration.getSqlFragments());
mapperParser.parse();
}
sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
}
@Override
public void invoke(List<Book> values, Context context) throws Exception {
SqlSession sqlSession = null;
try {
//获取sqlSession
sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, true);
sqlSession.getMapper(BookMapper.class).batchInsert(values);
} catch (Exception e) {
log.error(this.getClass().getName() + ".invoke()执行失败", e);
//重试一次
try {
if (sqlSession != null) {
sqlSession.getMapper(BookMapper.class).batchInsert(values);
}
} catch (Exception ex) {
log.error(this.getClass().getName() + ".invoke()重试执行失败", e);
}
} finally {
//关闭sqlSession
if (sqlSession != null) {
sqlSession.close();
}
}
}
}
3.MyDataSource
@Data
public class MyDataSource implements Serializable {
/**
* 数据源地址
*/
private String url;
/**
* 数据源驱动
*/
private String driver;
/**
* 用户名
*/
private String userName;
/**
* 密码
*/
private String password;
/**
* 连接池最大活跃数
*/
private Integer maxActive;
/**
* 连接池初始化数量
*/
private Integer initialSize;
/**
* 连接池最小数据
*/
private Integer minIdle;
}
4.BookMapper
/**
* 提供单个插入和批量插入的方法
* @author denglin
* @since 2020-12-16
*/
public interface BookMapper extends Serializable {
/***
* @description : 插入单个记录到数据库
* @param entity : 请求参数
*/
void insert(Book entity);
/***
* @description : 批量插入记录到数据库
* @param entities : 请求参数
*/
void batchInsert(List<Book> entities);
}
5.BookMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.test.mapper.BookMapper">
<!--插入单个记录-->
<select id="insert" parameterType="com.test.Book">
INSERT INTO book
(
name
,price
,created
,modified
)
VALUES
(
#{name}
,#{price}
,now()
,now()
)
ON DUPLICATE KEY UPDATE
modified = now()
</select>
<!--批量插入-->
<select id="batchInsert" parameterType="com.test.Book">
INSERT INTO os_top_rank
(
name
,price
,created
,modified
)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{name}
,#{price}
,now()
,now()
)
</foreach>
ON DUPLICATE KEY UPDATE
modified = now()
</select>
</mapper>
6.使用
//此处使用伪代码
MyDataSource myDataSource = new MyDataSource();
myDataSource.setUrl(...);
myDataSource.setDriver(...);
...;
xxxStream
.keyBy(...)
.process(...)
.addSink(new BookSink(myDataSource));