在 Java Flink 作业中使用 Python 用户定义的函数



是否可以在Java Flink作业中使用python用户定义的函数,或者无论如何来传达例如flink与java完成的转换的结果,并使用python用户定义的函数来应用一些机器学习的东西:

我知道从pyFlink你可以做这样的事情:

table_env.register_java_function("hash_code", "my.java.function.HashCode")

但是我需要做这样的事情,但从 java 添加 python 函数,或者如何将 java 转换的结果直接传递给 Python UDF Flink 作业?

我希望这些问题不要疯狂,但我需要知道是否存在以某种方式将 Flink DataStream API 与以 Java 作为主要语言的 Python Table API 进行通信? 这意味着从 Java 我需要做: 源 -> 转换 -> 接收器,但其中一些转换可以触发 Python 函数,或者 Python 函数将等待某些 Java 转换完成以对流结果执行某些操作。

我希望有人理解我在这里想做什么。

亲切问候!

Flink 1.10 中添加了对 Python UDF(用户定义函数(的支持——参见 PyFlink:在 Flink 的表 API 中引入对 UDF 的 Python 支持。例如,您可以执行以下操作:

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
table_env.register_function("add", add)
my_table.select("add(a, b)")

有关更多示例等,请参阅上面链接的博客文章或稳定文档。

在 Flink 1.11(预计下周发布(中,增加了对矢量化 Python UDF 的支持,带来了与 Pandas、Numpy 等的互操作性。此版本还包括对 SQL DDL 和 SQL 客户端中的 Python UDF 的支持。有关文档,请参阅主文档。

听起来你想从Java调用Python。有状态函数 API 更完整地支持这一点 - 请参阅远程函数。但是要从Java DataStream API调用Python,我认为你唯一的选择是使用Flink 1.11中添加的SQL DDL支持。请参阅 FLIP-106 和文档。

FLIP-106 有这个例子:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();

您应该能够将其转换为使用数据流 API。

此集成的示例: 在你的pom.xml中需要这种依赖,假设Flink 1.11是当前版本。

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>

创建环境:

private StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private StreamTableEnvironment tableEnv = getTableAPIEnv(env);
/*this SingleOutputStreamOperator will contains the result of the consumption from the  defined source*/
private SingleOutputStreamOperator<Event> stream; 

public static StreamTableEnvironment getTableAPIEnv(StreamExecutionEnvironment env) {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().getConfiguration().setString("python.files", path/function.py);
tableEnv.getConfig().getConfiguration().setString("python.client.executable", path/python);
tableEnv.getConfig().getConfiguration().setString("python.executable", path/python);
tableEnv.getConfig().getConfiguration().setString("taskmanager.memory.task.off-heap.size", "79mb");
/*pass here the function.py and the name of the function into the python script*/
tableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION FunctionName AS 'function.FunctionName' LANGUAGE PYTHON");
return tableEnv;
}

从要执行的转换开始,例如:

SingleOutputStreamOperator<EventProfile> profiles = createUserProfile(stream.keyBy(k -> k.id));
/*The result of that ProcessFunction `createUserProfile()` will be sent into the Python function to update some values of the profile and return them back into a defined function in Flink with Java: map function for example*/
profiles = turnIntoTable(profiles).map((MapFunction<Row, EventProfile>) x -> {
/*you custom code here to do the mapping*/
});
profiles.addSink(new yourCustomSinkFunction());
/*this function will process the Event and create the EventProfile class for this example but you can also use another operators (map, flatMap, etc)*/
private SingleOutputStreamOperator<EventProfile> createUserProfile(KeyedStream<Event, String> stream) {
return stream.process(new UserProfileProcessFunction());
}

/*This function will receive a SingleOutputStreamOperator and sent each record to the Python function trough the TableAPI and returns a Row of String(you can change the Row type) that will be mapped back into EventProfile class*/
@FunctionHint(output = @DataTypeHint("ROW<a STRING>"))
private DataStream<Row> turnIntoTable(SingleOutputStreamOperator<EventProfile> rowInput) {
Table events = tableEnv.fromDataStream(rowInput,
$("id"), $("noOfHits"), $("timestamp"))
.select("FunctionName(id, noOfHits, timestamp)");
return tableEnv.toAppendStream(events, Row.class);
}

最后

env.execute("Job Name");

调用 python 函数的示例FunctionNamefunction.py脚本:

@udf(
input_types=[
DataTypes.STRING(), DataTypes.INT(), DataTypes.TIMESTAMP(precision=3)
],
result_type=DataTypes.STRING()
)
def FunctionName(id, noOfHits, timestamp):
# function code here
return f"{id}|{noOfHits}|{timestamp}"

相关内容

  • 没有找到相关文章

最新更新