flinksql rowtime timestamp is null或者 rowtime timestamp is not defined

DataStream<UserInfo> keyedStream = executionEnvironment
                .addSource(new UserDataSource());
        keyedStream.assignTimestampsAndWatermarks(new MessageWaterEmitter());
        tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");
        Table table = tableEnv
                .sqlQuery(
                        "SELECT userId,COUNT(userId) as ticks,TUMBLE_END(startime,INTERVAL '5' SECOND) as startime FROM test "
                                + "GROUP BY TUMBLE(startime,INTERVAL '5' SECOND),userId");
        DataStream<Row> userInfoDataStream = tableEnv.toRetractStream(table, Row.class)
                .filter(new FilterFunction<Tuple2<Boolean, Row>>() {
                    @Override
                    public boolean filter(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
                        return booleanUserInfoTuple2.f0;
                    }
                }).map(new MapFunction<Tuple2<Boolean, Row>, Row>() {
                    @Override
                    public Row map(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
                        return booleanUserInfoTuple2.f1;
                    }
                });
        JdbcSink sink = new JdbcSink();
        userInfoDataStream.addSink(sink);

  以上使用错误,原因如下:

这个问题的根本原因是,当你在keyedStream上调用assignTimestampsAndWatermarks的时候,你并没有对这个调用的结果做任何事情,如果你重新编写这样的代码,它就会工作。

DataStream<UserInfo> keyedStream = executionEnvironment
    .addSource(new UserDataSource())
    .assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");


在一个流上调用assignTimestampsAndWatermarks并不会修改该流,而是返回一个有时间戳和水印的新流。
这也可以这样修正,这样可能会更清楚发生了什么。

DataStream<UserInfo> streamWithTSandWMs = keyedStream
    .assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", streamWithTSandWMs, "userId,ticks,startime.rowtime");

原文地址:http://saoniuhuo.com/question/detail-1911309.html