我正试图在Flink文档上复制时态联接示例,但没有显示任何结果。也没有错误。
我的桌子:
CREATE TABLE currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = '<my-server>',
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TABLE transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = '<my-server>',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json'
);
插入
INSERT into currency_rates
VALUES ('EURO', 0.0139, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into currency_rates
VALUES ('CAD', 0.03101, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('001', 'EURO', 9.10, TO_TIMESTAMP('2022-01-12 12:50:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('002', 'CAD', 5.20, TO_TIMESTAMP('2022-01-12 12:51:10', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('003', 'EURO', 12.12, TO_TIMESTAMP('2022-01-12 12:52:10', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions
VALUES ('004', 'CAD', 13.13, TO_TIMESTAMP('2022-01-12 12:53:20', 'yyyy-MM-dd HH:mm:ss'));
加入查询:
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
t.total,
c.currency_code,
t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;
联接查询没有显示任何结果,我在那里找不到任何工作样本。
我错过了什么来让这个临时加入工作?
问题与水印有关。临时联接不会产生更新/撤回流,因此它必须等待currency_rates流在第一个事务发生时完成的证据,然后才能为该事务产生最终结果。(对于后续交易,依此类推。(
如果添加
INSERT into currency_rates
VALUES ('EURO', 0.0130, TO_TIMESTAMP('2022-01-12 13:00:00', 'yyyy-MM-dd HH:mm:ss'));
这应该足以消除一些结果。
如果这不能解决问题,那么一些源(包括Kafka(使用的每个分区的水印可能就是问题所在。您可以通过确保每个Kafka分区都有一些数据,或者通过设置表exec-source空闲超时配置参数来解决这个问题,这样空闲分区就不会无限期地保留水印。
就我自己而言,我还必须将此属性添加到事务表中:
'scan.startup.mode' = 'earliest-offset',
如果不对表DDL进行此更改,我将出现错误。(请参见https://stackoverflow.com/a/70739865/2000823了解更多信息。(
这些是我从时间连接中得到的结果:
id eur_rate total_eur total currency_code transaction_time
001 0.0139 0.126490 9.10 EURO 2022-01-12 12:50:00.000
003 0.0139 0.168468 12.12 EURO 2022-01-12 12:52:10.000
002 0.0310 0.161200 5.20 CAD 2022-01-12 12:51:10.000