如何访问Flink statefun功能



flink游乐场给出了如下演示:

public final class GreeterAppServer {
public static void main(String[] args) {
final StatefulFunctions functions = new StatefulFunctions();
functions.withStatefulFunction(UserFn.SPEC);
functions.withStatefulFunction(GreetingsFn.SPEC);
final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler();
final Undertow httpServer =
Undertow.builder()
.addHttpListener(1108, "0.0.0.0")
.setHandler(new UndertowHttpHandler(requestReplyHandler))
.build();
httpServer.start();
}

如何通过http请求访问函数?邮递收到

当您设置像这样的有状态函数时

functions.withStatefulFunction(UserFn.SPEC); 

实际上,您将函数注册为http服务器的处理程序。

接下来要做的是定义函数将使用的端点。

static final TypeName TYPENAME = TypeName.typeNameOf("greeter.fns", "user");
static final StatefulFunctionSpec SPEC =
StatefulFunctionSpec.builder(TYPENAME)
.withValueSpecs(SEEN_COUNT, SEEN_TIMESTAMP_MS)
.withSupplier(UserFn::new)
.build();

在上面的示例中,有状态函数被注册到端点greeter.fns/user。参考编号:https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/UserFn.java

这也应该在module.yaml文件中定义如下。参考编号:https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/module.yaml

functions: greeter.fns/*

最后,将module.yaml配置为docker-compose.yml中的一个环境。参考编号:https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/docker-compose.yml

volumes:
- ./module.yaml:/module.yaml

最新更新