Apache Flink:如何使用Java Map流(或包含DTO的Map)?



我正在使用 Flink,并且有一股 JSON 字符串流到达我的系统,其中包含动态变化的字段和嵌套字段。所以我不能模拟和转换这个传入的 JSON 作为静态 POJO,我必须依靠地图。

我的第一个转换是使用 GSON 解析将 JSON 字符串流转换为 Map 对象流,然后将映射包装在名为 Data 的 DTO 中。

(inside the first map transformation)
LinkedTreeMap map = gson.fromJson(input, LinkedTreeMap.class);
Data data = new Data(map); // Data has getters, setters for the map and implements Serializable

在此转换处理之后,我尝试将生成的流馈送到我的自定义 Flink 接收器中时,会出现问题。不会在接收器中调用调用函数。但是,如果我从包含 DTO 的这张地图更改为没有地图的基元或常规 DTO,则接收器可以工作。

我的 DTO 看起来像这样:

public class FakeDTO {
private String id;
private LinkedTreeMap map; // com.google.gson.internal
// getters and setters
// constructors, empty and with fields

我尝试了以下两种解决方案:

env.getConfig().addDefaultKryoSerializer(LinkedTreeMap.class,MapSerializer.class; 
env.getConfig().disableGenericTypes();

在这种情况下,我可以使用任何专家建议吗?

我能够解决这个问题。在我的 Flink 日志中,我看到找不到一个名为 ReflectionSerializerFactory 类的 Kryo 文件。我在 maven 中更新了 Kryo 版本,并为我的地图使用了 Map 类型,Flink 文档说 Flink 支持该类型。

只需确保在代码中指定泛型类型,并在地图的 POJO 中添加 getter 和 setter。

我还使用 .returns(xyz.class( 类型去换来避免类型擦除的影响。

相关内容

  • 没有找到相关文章

最新更新