Flink动态生成TypeInformation



我试图通过RichMapFunction<row,>在数据流的一行中解析嵌套字段。它的输入和输出是Row类型。这个嵌套的列可以包含任意数量的字段。

DataStream<Row> outStream =  stream.map(new ParsePayload(functionMap, inputTypeInformation))
.returns(<output TypeInformation>)
.uid("ParseNestedColumn");
private static class ParsePayload extends RichMapFunction<Row, Row> implements Serializable
{
@Override
public Row map(Row row) throws Exception {
<business logic>
…….
return resultRow;
}
}

问题是,我想只在评估映射函数或通过创建输出行后返回一行的类型信息,因为一行中的字段不是固定的。

我尝试了Types.ROW_NAMED()和ResultTypeQueryable接口,但两者都在计算映射函数之前检查类型信息,这样我就不能向流提供类型信息。

p。S -我不想为我的作业启用泛型类型。

我认为这是不可能的。

如果您认为TypeInformation用于生成源所需的序列化器,例如,因此您将在这里创建循环依赖关系(源需要TypeInformation将数据传递给map,但它不能,因为map需要首先提供TypeInformation)。

有不同的方法可以做到这一点,但它们都需要使用Generic*类或以byte[]String的形式获取数据并手动解析为预期的类型,或者尝试实现可以代表所有可能性的自定义类型,您需要处理。

相关内容

  • 没有找到相关文章

最新更新