Apache Flink:提取元组的类型信息



我正在使用FlinkKafkaConsumer09,其中我有一个ByteArrayDeseializationSchema实现KeyedDeserializationSchema>,现在在getProducedType中我如何提取TypeInformation。

我在文档中读到TypeExtractor.getForClass方法不支持ParameterizedTypes,我应该使用TypeExtractor的哪种方法来实现这一点?

我认为我们必须使用createTypeInfo方法,你能告诉我如何使用它来返回TypeInformation吗?

如果您想要Tuple2<byte[], byte[]>:的TypeInformation

return TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(byte[].class, byte[].class)

可能吗?

如果反序列化架构的返回类型是byte[],则可以使用PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO作为getProducedType的返回值。

在Java API 中键入提示

为了帮助Flink无法重建擦除的通用类型信息,Java API从0.9版本开始提供所谓的类型提示。类型提示告诉系统由函数生成的数据集的类型。下面给出了一个例子:

DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(SomeType.class);

return语句指定生成的类型,在本例中通过类指定。提示通过支持类型定义

Classes, for non-parameterized types (no generics)
Strings in the form of returns("Tuple2<Integer, my.SomeType>"), which are parsed and converted to a TypeInformation.
A TypeInformation directly

你可以试试这个:

case class CaseClassForYourExpectedMessage(key:String,value:String)
/* getProducedType method */
override def getProducedType: TypeInformation[CaseClassForYourExpectedMessage] =
TypeExtractor.getForClass(classOf[CaseClassForYourExpectedMessage])

相关内容

  • 没有找到相关文章

最新更新