闪烁不显示数据的临时联接



我正试图在Flink文档上复制时态联接示例,但没有显示任何结果。也没有错误。

我的桌子:

CREATE TABLE currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = '<my-server>',
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TABLE transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = '<my-server>',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json'
);

插入

INSERT into currency_rates 
VALUES ('EURO', 0.0139, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into currency_rates 
VALUES ('CAD', 0.03101, TO_TIMESTAMP('2022-01-12 12:37:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions 
VALUES ('001', 'EURO', 9.10, TO_TIMESTAMP('2022-01-12 12:50:00', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions 
VALUES ('002', 'CAD', 5.20, TO_TIMESTAMP('2022-01-12 12:51:10', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions 
VALUES ('003', 'EURO', 12.12, TO_TIMESTAMP('2022-01-12 12:52:10', 'yyyy-MM-dd HH:mm:ss'));
INSERT into transactions 
VALUES ('004', 'CAD', 13.13, TO_TIMESTAMP('2022-01-12 12:53:20', 'yyyy-MM-dd HH:mm:ss'));

加入查询:

SELECT 
t.id,
t.total * c.eur_rate AS total_eur,
t.total, 
c.currency_code,
t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;

联接查询没有显示任何结果,我在那里找不到任何工作样本。

我错过了什么来让这个临时加入工作?

问题与水印有关。临时联接不会产生更新/撤回流,因此它必须等待currency_rates流在第一个事务发生时完成的证据,然后才能为该事务产生最终结果。(对于后续交易,依此类推。(

如果添加

INSERT into currency_rates 
VALUES ('EURO', 0.0130, TO_TIMESTAMP('2022-01-12 13:00:00', 'yyyy-MM-dd HH:mm:ss'));

这应该足以消除一些结果。

如果这不能解决问题,那么一些源(包括Kafka(使用的每个分区的水印可能就是问题所在。您可以通过确保每个Kafka分区都有一些数据,或者通过设置表exec-source空闲超时配置参数来解决这个问题,这样空闲分区就不会无限期地保留水印。

就我自己而言,我还必须将此属性添加到事务表中:

'scan.startup.mode' = 'earliest-offset',

如果不对表DDL进行此更改,我将出现错误。(请参见https://stackoverflow.com/a/70739865/2000823了解更多信息。(

这些是我从时间连接中得到的结果:

id    eur_rate   total_eur    total   currency_code   transaction_time
001   0.0139     0.126490      9.10   EURO            2022-01-12 12:50:00.000
003   0.0139     0.168468     12.12   EURO            2022-01-12 12:52:10.000
002   0.0310     0.161200      5.20   CAD             2022-01-12 12:51:10.000

最新更新