Flink无法反序列化Debezium生成的JSON



我正在尝试使用Flink来消耗Debezium产生的更改事件日志。JSON是:

{
"schema":{
},
"payload":{
"before":null,
"after":{
"team_config_id":3800,
"team_config_team_id":"team22bcb26e-499a-41e6-8746-b7d980e79e04",
"team_config_sfdc_account_id":null,
"team_config_sfdc_account_url":null,
"team_config_business_type":5,
"team_config_dpsa_status":0,
"team_config_desc":null,
"team_config_company_id":null,
"team_config_hm_count_stages":null,
"team_config_assign_credits_times":null,
"team_config_real_renew_date":null,
"team_config_action_date":null,
"team_config_last_action_date":null,
"team_config_business_tier_notification":"{}",
"team_config_create_date":1670724933000,
"team_config_update_date":1670724933000,
"team_config_rediscovery_tier":0,
"team_config_rediscovery_tier_notification":"{}",
"team_config_sfdc_industry":null,
"team_config_sfdc_market_segment":null,
"team_config_unterminated_note_id":0
},
"source":{
},
"op":"c",
"ts_ms":1670724933149,
"transaction":null
}
}

我尝试了两种方法来声明输入模式。

第一种方法是直接解析JSON数据:
create table team_config_source (
`payload` ROW <
`after` ROW <
...
team_config_create_date timestamp(3),
team_config_update_date timestamp(3),
...
>
>
) WITH (
'connector' = 'kafka',
...
'format' = 'json'
)

但是Flink会抛出由java.time.format.DateTimeParseException: Text '1670724933000' could not be parsed at index 0引起的错误org.apache.flink.formats.json.JsonToRowDataConverters$JsonParseException: Fail to deserialize at field: team_config_create_date。Flink不支持这种格式的时间戳吗?

我还尝试了另一种方法,使用内置的debezium格式:

create table team_config_source (
team_config_create_id int,
...
) WITH (
'connector' = 'kafka',
...
'format' = 'debezium-json'
)

但Flink提出了java.lang.NullPointerException引起的另一个错误java.io.IOException: Corrupt Debezium JSON message。我发现有人说更新事件不应该把null作为before的值,但这个消息是一个创建事件。

谁能帮我检查一下DDL?

我是Flink专家,但Flink中的TIMESTAMP不是Epoch时间,而是datetime格式。

在这种情况下,你可以这样定义表:

team_config_create_bigint BIGINT,
team_config_update_bigint BIGINT,
...
team_config_create_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_create_bigint)),
team_config_update_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_update_bigint))

最新更新