如何在flinksql中进行时态表联接时设置hbase事件时间



左边是来自kafka 的事实表

CREATE TABLE  dig_user_join_kafka (
id string,
username string,
city_id string,
create_time TIMESTAMP(3),
WATERMARK FOR create_time AS create_time - INTERVAL '5' SECOND

)

右边是hbase 的维度表

CREATE TABLE dim_city_hbase (
id string,
info ROW<
name string
>,
PRIMARY KEY (id) NOT ENFORCED
)

我想创建一个带有事件时间的时态表

insert into dim_city_join_hbase 
select id as id, 
ROW(username, city, create_time) as info 
from ( 
select kj.id as id, 
kj.username as username, 
hj.info.name as city, 
kj.create_time as create_time 
from dig_user_join2_kafka kj
left join dim_city_hbase FOR SYSTEM_TIME AS OF kj.create_time  hj
on kj.city_id = hj.id
)

现在,错误是

The main method caused an error: Event-Time Temporal Table Join requires both primary
key and row time attribute in versioned table, but no row time attribute can be found

这意味着hbase表没有行时间,如何设置hbase事件时间?

许多示例显示hbase时态表与proctime连接,但没有人使用事件时间

使用系统函数"PROCTIME((";或";CURRENT_ROW_TIMESTAMP((";添加一个字段作为rowtime,然后设置水印:

" update_time AS CURRENT_ROW_TIMESTAMP(), " +
" WATERMARK FOR update_time AS update_time, " +

您需要在hbase dim表上添加事件时间属性。从您的代码表dig_user_join_kafka设置了事件时间属性,维度表可以这样做:

CREATE TABLE dim_city_hbase (
id STRING,
info ROW<name STRING, ts STRING>,  // ts is self defined column
rowtime AS TO_TIMESTAMP(ts),
WATERMARK FOR rowtime AS rowtime,
PRIMARY KEY (id) NOT ENFORCED
)

或者,像@haoqi回答

CREATE TABLE dim_city_hbase (
id STRING,
info ROW<name STRING>,  
update_time AS CURRENT_ROW_TIMESTAMP(),
WATERMARK FOR update_time AS update_time,
PRIMARY KEY (id) NOT ENFORCED
)

参考:官方描述的rowtime属性

相关内容

  • 没有找到相关文章