flinksql rowtime timestamp is null或者 rowtime timestamp is not defined
DataStream<UserInfo> keyedStream = executionEnvironment
.addSource(new UserDataSource());
keyedStream.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");
Table table = tableEnv
.sqlQuery(
"SELECT userId,COUNT(userId) as ticks,TUMBLE_END(startime,INTERVAL '5' SECOND) as startime FROM test "
+ "GROUP BY TUMBLE(startime,INTERVAL '5' SECOND),userId");
DataStream<Row> userInfoDataStream = tableEnv.toRetractStream(table, Row.class)
.filter(new FilterFunction<Tuple2<Boolean, Row>>() {
@Override
public boolean filter(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
return booleanUserInfoTuple2.f0;
}
}).map(new MapFunction<Tuple2<Boolean, Row>, Row>() {
@Override
public Row map(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
return booleanUserInfoTuple2.f1;
}
});
JdbcSink sink = new JdbcSink();
userInfoDataStream.addSink(sink);
以上使用错误,原因如下:
这个问题的根本原因是,当你在keyedStream上调用assignTimestampsAndWatermarks的时候,你并没有对这个调用的结果做任何事情,如果你重新编写这样的代码,它就会工作。
DataStream<UserInfo> keyedStream = executionEnvironment
.addSource(new UserDataSource())
.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");
在一个流上调用assignTimestampsAndWatermarks并不会修改该流,而是返回一个有时间戳和水印的新流。
这也可以这样修正,这样可能会更清楚发生了什么。
DataStream<UserInfo> streamWithTSandWMs = keyedStream
.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", streamWithTSandWMs, "userId,ticks,startime.rowtime");
原文地址:http://saoniuhuo.com/question/detail-1911309.html