我是 apache-flink 的新手,我需要处理一些来自 akka 本地 scoket 的数据,这些数据正在流式传输到"ws://localhost:9000/ws">
在flink API中,我只能找到一个名为'socketTextStream'的函数。 需要主机名、端口和分隔符
前任:
DataStream<String> text = env.socketTextStream(hostname, port, "n");
如何将套接字指定为"ws://localhost:9000/ws"?
问题是socketTextStream
内部使用常规套接字,即java.net.Socket
用于与指定地址的连接。但是假设从您描述中的地址来看,您正在处理 WebSockets。不能使用常规套接字从 WebSocket 读取数据。目前,Flink 没有从 WebSockets AFAIK 创建数据流的 API。要获得您想要获得的内容,唯一要做的就是编写您自己的SourceFunction
,该将在内部使用javax.websocket-api
创建连接并从您的服务器读取数据。