选择所有字段为json字符串作为Flink SQL中的新字段



我正在使用Flink Table API。我有一个表定义,我想选择所有字段并将它们转换为新字段中的JSON字符串。

我的表有三个字段;a: String, b: Int, c: Timestamp.

如果我这样做

INSERT INTO kinesis
SELECT a, b, c from my_table

kinesis stream有json记录;

{
"a" : value,
"b": value,
"c": value
}

然而,我想要一些类似于Spark的功能;

INSERT INTO kinesis
SELECT "constant_value" as my source, to_json(struct(*)) as playload from my_table

所以,预期结果是;

{
"my_source": "constant_value",
"payload": "json string from the first example that has a,b,c"
}

我在Flink中看不到任何to_jsonstruct()函数。有可能实施吗?

您可能需要实现您自己的用户定义聚合函数。

这就是我所做的,这里我假设UDF的输入看起来像

to_json('col1', 'col1', 'col2', col2)

public class RowToJson extends ScalarFunction {
public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object... row) throws Exception {
if(row.length % 2 != 0) {
throw new Exception("Wrong key/value pairs!");
}
String json = IntStream.range(0, row.length).filter(index -> index % 2 == 0).mapToObj(index -> {
String name = row[index].toString();
Object value = row[index+1];
... ...
}).collect(Collectors.joining(",", "{", "}"));
return json;
}
}

如果您希望udf可以用于分组,您必须从AggregateFunction

扩展您的udf类
public class RowsToJson extends AggregateFunction<String, List<String>>{
@Override
public String getValue(List<String> accumulator) {
return accumulator.stream().collect(Collectors.joining(",", "[", "]"));
}
@Override
public List<String> createAccumulator() {
return new ArrayList<String>();
}
public void accumulate(List<String> acc, @DataTypeHint(inputGroup = InputGroup.ANY) Object... row) throws Exception {
if(row.length % 2 != 0) {
throw new Exception("Wrong key/value pairs!");
}
String json = IntStream.range(0, row.length).filter(index -> index % 2 == 0).mapToObj(index -> {
String name = row[index].toString();
Object value = row[index+1];
... ...
}).collect(Collectors.joining(",", "{", "}"));
acc.add(json);
}
}

从Flink 1.15开始,JSON_OBJECT SQL函数可以帮助您从单个列创建JSON字段:Flink JSON函数

SELECT JSON_OBJECT('col1' value col1, 'col2' value col2) FROM table

最新更新