左边是来自kafka 的事实表
CREATE TABLE dig_user_join_kafka (
id string,
username string,
city_id string,
create_time TIMESTAMP(3),
WATERMARK FOR create_time AS create_time - INTERVAL '5' SECOND
)
右边是hbase 的维度表
CREATE TABLE dim_city_hbase (
id string,
info ROW<
name string
>,
PRIMARY KEY (id) NOT ENFORCED
)
我想创建一个带有事件时间的时态表
insert into dim_city_join_hbase
select id as id,
ROW(username, city, create_time) as info
from (
select kj.id as id,
kj.username as username,
hj.info.name as city,
kj.create_time as create_time
from dig_user_join2_kafka kj
left join dim_city_hbase FOR SYSTEM_TIME AS OF kj.create_time hj
on kj.city_id = hj.id
)
现在,错误是
The main method caused an error: Event-Time Temporal Table Join requires both primary
key and row time attribute in versioned table, but no row time attribute can be found
这意味着hbase表没有行时间,如何设置hbase事件时间?
许多示例显示hbase时态表与proctime连接,但没有人使用事件时间
使用系统函数"PROCTIME((";或";CURRENT_ROW_TIMESTAMP((";添加一个字段作为rowtime,然后设置水印:
" update_time AS CURRENT_ROW_TIMESTAMP(), " +
" WATERMARK FOR update_time AS update_time, " +
您需要在hbase dim表上添加事件时间属性。从您的代码表dig_user_join_kafka设置了事件时间属性,维度表可以这样做:
CREATE TABLE dim_city_hbase (
id STRING,
info ROW<name STRING, ts STRING>, // ts is self defined column
rowtime AS TO_TIMESTAMP(ts),
WATERMARK FOR rowtime AS rowtime,
PRIMARY KEY (id) NOT ENFORCED
)
或者,像@haoqi回答
CREATE TABLE dim_city_hbase (
id STRING,
info ROW<name STRING>,
update_time AS CURRENT_ROW_TIMESTAMP(),
WATERMARK FOR update_time AS update_time,
PRIMARY KEY (id) NOT ENFORCED
)
参考:官方描述的rowtime属性