PyFlink unix epoch时间戳转换问题



我使用unix epoch时间戳来处理事件,我使用带有Kinesis连接器的表作为源表。我需要使用相同的时间戳字段作为水印。在python中怎么做呢?我使用的是Flink-1.11版本,因为这是AWS支持的最新版本。

事件格式:{'event_time': 1633098843692, 'ticker': 'AMZN'}

Python表:

CREATE TABLE event_input_table (
event_time TIMESTAMP,
ticker VARCHAR,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'inputstream1',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json' ,
'aws.credentials.provider' = 'ENV_VAR' 
)
CREATE TABLE event_input_table (
event_time BIGINT,
ip_src VARCHAR,
ip_dst VARCHAR,
domain ARRAY<VARCHAR>,
new_time as TO_TIMESTAMP(FROM_UNIXTIME(event_time))
)

最新更新