使用. createtemporaryfunction()注册UDF时,数据类型提取不工作



我有一个表格"带有一个自定义数据类型"properties",它实现了一个HashMap<String,>并被Flink解释为

RAW('org...impl.properties.Properties', '...') 

数据类型。PropertyValue也是一个自定义数据类型。Properties类和PropertyValue类都不是POJO。

vertex_idvertex_propertiesevent_time
v1相关性=2:整数,工作日:Tuesday:字符串2021-04-27 10:21:09.999

当有人遇到这个问题时:flink注释是正确的解决方案,我只是把它们放错了。

我有像"ExtractPropertyValue"使用eval方法:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/udfs/

必须在类本身上面有一个函数提示,如:

@FunctionHint(output = @DataTypeHint(value= "RAW", bridgedTo = PropertyValue.class))
public class ExtractPropertyValue extends ScalarFunction {...}

其中PropertyValue是该函数应从表列中提取的自定义数据类型。另一个数据类型提示在eval-函数本身的上面。

@FunctionHint(input = @DataTypeHint(inputGroup = InputGroup.ANY))
public PropertyValue eval(Object propertiesO) {
Properties properties = (Properties) propertiesO;
return properties.get(propertyKey);
}

我还遇到了聚合函数的问题,它实现了accumulator方法。在这种情况下,必须使用函数提示来桥接累加器:

@FunctionHint(accumulator = @DataTypeHint(value = "RAW", bridgedTo = AvgAcc.class),
input = @DataTypeHint(inputGroup = InputGroup.ANY))
public void accumulate(AvgAcc acc, Object iValueO) {...}

希望这能帮助到一些人!

最新更新