示例代码:
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = (
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.execute_sql(
"""
CREATE TABLE table1 (
id INT,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '/home/alex/work/test-flink/data1.csv'
)
"""
)
table_env.execute_sql(
"""
CREATE TABLE table2 (
id2 INT,
ts2 TIMESTAMP(3),
WATERMARK FOR ts2 AS ts2 - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '/home/alex/work/test-flink/data2.csv'
)
"""
)
table1 = table_env.from_path("table1")
table2 = table_env.from_path("table2")
print(table1.join(table2).where("ts = ts2 && id = id2").select("id, ts").to_pandas())
出现错误:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
FlinkLogicalLegacySink(name=[collect], fields=[id, ts])
+- FlinkLogicalCalc(select=[id, ts])
+- FlinkLogicalJoin(condition=[AND(=($2, $5), =($0, $3))], joinType=[inner])
:- FlinkLogicalCalc(select=[id, ts, CAST(ts) AS ts0])
: +- FlinkLogicalWatermarkAssigner(rowtime=[ts], watermark=[-($1, 5000:INTERVAL SECOND)])
: +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, table1, source: [CsvTableSource(read fields: id, ts)]]], fields=[id, ts])
+- FlinkLogicalCalc(select=[id2, ts2, CAST(ts2) AS ts20])
+- FlinkLogicalWatermarkAssigner(rowtime=[ts2], watermark=[-($1, 5000:INTERVAL SECOND)])
+- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, table2, source: [CsvTableSource(read fields: id2, ts2)]]], fields=[id2, ts2])
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
这似乎与其他类似的问题不同,比如这个问题,因为我遵循了文档中的说明,并指定了equi-join和时间间隔join(ts = ts2 && id = id2
(:
区间联接至少需要一个equi-join谓词和一个联接限制双方时间的条件。这种情况可能是由两个适当的范围谓词(<,<=,>=,>(或单个范围谓词定义相等谓词,用于比较相同类型的时间属性(即处理时间或事件时间(。
例如,以下谓词是有效的间隔联接条件:
ltime = rtime
如果问题是这些不是仅追加的表,我不知道如何使它们如此。
设置时间特性没有帮助:
StreamExecutionEnvironment.get_execution_environment().set_stream_time_characteristic(
TimeCharacteristic.EventTime
)
如果我使用处理时间来代替ts AS PROCTIME()
,则查询成功。但我认为我需要使用事件时间,我不明白为什么会有这种差异。
SQL中两个正则表之间的连接总是使用FROM a, b
或a JOIN b
以相同的方式表示。
然而,Flink为相同的语法提供了两种类型的连接运算符。一种是间隔联接,它要求时间属性根据时间将两个表相互关联。一个是常规SQL联接,它是以数据库中的通用方式实现的。
间隔联接只是一种流式优化,以在运行时保持较低的状态大小,并且不会在结果中产生更新。常规SQL联接运算符最终可以产生与间隔相同的结果,但维护成本更高。
为了区分间隔联接和常规联接,优化器在WHERE
子句中搜索一个对时间属性有效的谓词。对于间隔联接,输出可以始终包含两个行时间属性,用于外部时间操作(下游时间运算符(。因为两个行时间属性仍然与底层水印系统对齐。这意味着,例如,外部窗口或其他间隔联接可以再次使用时间属性。
然而,区间连接的实现具有FLINK-10211中已知并涵盖的一些缺点。由于设计不好,我们无法区分某些位置的间隔联接和规则联接。因此,我们需要假设常规联接可以是间隔联接,并且不能为用户自动将时间属性强制转换为TIMESTAMP
。相反,我们目前禁止在常规联接的输出中使用时间属性。
在某种程度上,这种限制有望消失,在此之前,用户有两种可能性:
-
不要对包含时间属性的表使用常规联接。您也可以使用嵌套的
SELECT
子句将其投影出去,或者在加入之前执行CAST
。 -
使用
SELECT
子句中的CAST(col AS TIMESTAMP)
将时间属性强制转换为常规时间戳。它将被下推到联接操作中。
您的异常表示您正在使用常规联接。间隔联接需要一个操作范围(即使只有1毫秒(。他们不支持平等。