我有一个JSON数据对象,如下所示:
{
"monitorId": 865,
"deviceId": "94:54:93:49:96:13",
"data": "{"0001":105.0,"0002":1.21,"0003":0.69,"0004":1.46,"0005":47.43,"0006":103.3}",
"state": 2,
"time": 1593687809180
}
字段data
本身就是一个JSON对象字符串。我如何用Flink的表API来表达这个模式?我尝试创建一个UDF,它接收JSON字符串并输出解析后的内容。但是,我找不到填充DataTypes.ROW
对象的方法:
t_env.connect(
Kafka()
.version("universal")
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)
.property("zookeeper.connect", PROD_ZOOKEEPER)
.start_from_latest()
)
.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"deviceId": {
"type": "string"
},
"data": {
"type": "string"
},
"state": {
"type": "integer"
},
"time": {
"type": "string"
}
}
}
"""
)
)
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("deviceId", DataTypes.STRING())
.field("time", DataTypes.STRING())
.field("data", DataTypes.STRING())
.field("state", DataTypes.STRING())
)
.register_table_source(INPUT_TABLE)
t_env.connect(Kafka()
.version("universal")
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)
.property("zookeeper.connect", LOCAL_ZOOKEEPER)
.start_from_latest()
)
.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"data": {
"type": "string"
},
"time": {
"type": "string"
}
}
}
"""
)
)
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("time", DataTypes.STRING())
.field("data", DataTypes.ROW([DataTypes.FIELD("feature1", DataTypes.STRING())]))
)
.register_table_sink(OUTPUT_TABLE)
class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ? # <--- how do I populate the DataType.ROW with each individual value from data?
t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()],
result_type =
DataTypes.ROW([
DataTypes.FIELD("feature1", DataTypes.STRING())
])))
t_env.from_path(INPUT_TABLE)
.select("monitorId, time, data_converter(data)")
.insert_into(OUTPUT_TABLE)
t_env.execute("IU pyflink job")
如果您希望Python UDF的结果类型是DataTypes.Row
,您可以使用Row
的Python类来包装它。Row
的类从Tuple扩展而来。您可以使用以下代码导入:from pyflink.table.types import Row