Flink-将Avro数据流转换为表



我在Kafka中有Avro格式的消息。这些必须转换为表并使用SQL进行选择,然后转换为流并最终下沉。有多个Kafka主题具有不同的Avro模式,因此需要动态表。

这是我使用的代码

StreamExecutionEnvironment env = ...;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
FlinkKafkaConsumer<MyAvroClass> kafkaConsumer = ...;
var kafkaInputStream = env.addSource(kafkaConsumer, "kafkaInput");
Table table = tableEnv.fromDataStream(kafkaInputStream);
tableEnv.executeSql("DESCRIBE " + table).print();
...

MyAvroClass是Avro类,它扩展了SpecificRecordBase并包含一个数组
此类的代码。

public class MyAvroClass extends SpecificRecordBase implements SpecificRecord {
// avro fields
private String event_id;
private User user;
private List<Item> items; 

// getter, setters, constructors, builders, ...
}

我无法访问items字段的元素。当我打印表格描述时,我看到项目的类型是ANY

+------------+-------------------------------------------------------------+------+-----+--------+-----------+
|       name |                                                        type | null | key | extras | watermark |
+------------+-------------------------------------------------------------+------+-----+--------+-----------+
|   event_id |                                                      STRING | true |     |        |           |
|      items |                        LEGACY('RAW', 'ANY<java.util.List>') | true |     |        |           |
|       user |  LEGACY('STRUCTURED_TYPE', 'POJO<com.company.events.User>') | true |     |        |           |
+------------+-------------------------------------------------------------+------+-----+--------+-----------+  

如何将其转换为可用于从项查询的类型?提前感谢

我目前正在使用此方法。这个想法是SpecificRecord -(by AvroSerializationSchema)-> binary -(by AvroRowDeserializationSchema)-> Row。请注意,您需要使用FLINK-23885注释中建议的.returns来表示.map的输出类型。

public static <T extends SpecificRecord> Table toTable(StreamTableEnvironment tEnv,
DataStream<T> dataStream,
Class<T> cls) {
RichMapFunction<T, Row> avroSpecific2RowConverter = new RichMapFunction<>() {
private transient AvroSerializationSchema<T> avro2bin = null;
private transient AvroRowDeserializationSchema bin2row = null;
@Override
public void open(Configuration parameters) throws Exception {
avro2bin = AvroSerializationSchema.forSpecific(cls);
bin2row = new AvroRowDeserializationSchema(cls);
}
@Override
public Row map(T value) throws Exception {
byte[] bytes = avro2bin.serialize(value);
Row row = bin2row.deserialize(bytes);
return row;
}
};
SingleOutputStreamOperator<Row> rows = dataStream.map(avroSpecific2RowConverter)
// https://issues.apache.org/jira/browse/FLINK-23885
.returns(AvroSchemaConverter.convertToTypeInfo(cls));
return tEnv.fromDataStream(rows);
}

我遇到了一个类似的问题,Flink Table的类型插值无法获取java.util.List或java.util.Map,尽管官方支持它。我找到了一个变通方法(读作:哈克(,我想分享一下。

步骤1:当把你的数据映射到POJO时,坚持使用你知道会正确插值的字段。在我的情况下,我有Map<字符串,字符串>其在对CCD_ 7进行插值时失败。我将其连接到一个单独的字符串中(例如,逗号分隔的条目,其中每个条目都是"key:value"。它们连接到单个字符串中(。

步骤2:对于您的输入数据流,请确保将其转换为DataStream[MY_POJO_TYPE]。

第三步:像往常一样进行Table table = tableEnv.fromDataStream(kafkaInputStream);

步骤4:使用ScalarFunction对表执行另一个转换。在我的例子中,我编写了一个用户定义的标量函数,它接受String,并输出Map<字符串,字符串>。奇怪的是,当在表抽象中插入Map AFTER数据时,Flink能够将类型正确地插入到Flink Map类型中。

以下是用户定义的标量函数(在java中(的大致示例:

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.table.functions.ScalarFunction;
public class TagsMapTypeScalarFunction extends ScalarFunction {
// See
// https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#scalar-functions for reference implementation and how interfacing with ScalarFunction works.
public Map<String, String> eval(String s) {
// input is comma delimited key:value pairs.
return Arrays.stream(s.split(","))
.filter(kv -> kv != "")
.map(kv -> kv.split(":"))
.filter(pair -> pair.length == 2)
.filter(pair -> Arrays.stream(pair).allMatch(token -> token != ""))
.collect(Collectors.toMap(pair -> pair[0].trim(), pair -> pair[1].trim()));
}
}

以下是调用的大致样子(在Scala中(:


//This table has a field "tags" which is the comma-delimited, key:value squished string.
val transformedTable = tableEnv.fromDataStream(kafkaInputStream: DataStream[POJO])
tableEnv.createTemporaryFunction(
"TagsMapTypeScalarFunction",
classOf[TagsMapTypeScalarFunction]
);
val anotherTransform =
transformedTable
.select($"*", call("TagsMapTypeScalarFunction", $"tags").as("replace_tags"))
.dropColumns($"tags")
.renameColumns($"replace_tags".as("tags"))
anotherTransform

这当然有点";"忙";工作从映射转换为字符串,并返回为映射。但这比被卡住要好。

最新更新