如何使用java在Apache flink中读取json文件格式。 我找不到任何合适的代码来使用 java 在 flink 中读取 json 文件并在其上进行一些转换。 任何建议或代码都非常感谢。
有关将 Kafka 与 DataStream API 配合使用的信息,请参阅 https://stackoverflow.com/a/62072265/2000823。这个想法是实现一个适当的DeserializationSchema
,或KafkaDeserializationSchema
。在我上面链接的答案中有一个示例(以及指向更多内容的指针(。
或者,如果要使用表 API 或 SQL,则更容易。您可以使用一些 DDL 来配置它。例如:
CREATE TABLE minute_stats (
`minute` TIMESTAMP(3),
`currency` STRING,
`revenueSum` DOUBLE,
`orderCnt` BIGINT,
WATERMARK FOR `minute` AS `minute` - INTERVAL '10' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'minute_stats',
'connector.properties.zookeeper.connect' = 'not-needed',
'connector.properties.bootstrap.servers' = 'kafka:9092',
'connector.startup-mode' = 'earliest-offset',
'format.type' = 'json'
);
为了在从文件读取时在本地尝试操作,您需要以不同的方式执行操作。像这样的东西
DataStreamSource<String> rawInput = env.readFile(
new TextInputFormat(new Path(fileLocation)), fileLocation);
DataStream<Event> = rawInput.flatMap(new MyJSONTransformer());
其中MyJSONTransformer
可以使用杰克逊ObjectMapper
将 JSON 转换为一些方便的事件类型(POJO(。