我试图让 Flink 从套接字连接读取一些流文本输入。
在我的输入套接字代码中,我使用了以下内容来发布 Flink 接收的文本:
outputStream.writeUTF(new String(message.getPayload()));
但是,在 Flink 中,即使连接正常工作,我也无法读取传入的文本。我用谷歌搜索,发现我可能需要使用readUTF()
阅读文本。换句话说,这主要是由于编码。但是,这些是在InputStream
上完成的;Flink 没有这个,因为我的输入是DataStream<String>
的形式:
DataStream<String> text = env.socketTextStream("localhost", port, "n");
谁能建议我这个?谢谢!
writeUTF()
不会生成"用UTF编码的单词"。它生成具有只有readUTF()
才能理解的 16 位长字前缀的特定格式。如果你不想使用readUTF()
你也不能使用writeUTF()
。使用writeBytes()
或其他任何适合您阅读代码的内容。
您的程序试图实现什么? 这些是 Flink 的用例,如果你不打算实现其中之一,如果你只是在做低级网络通信,我建议使用 Java 套接字。
如果使用Java,它非常简单。 您需要从套接字获取数据输入流,然后将传入的字节转换回字符串。 例如,这是对我有用的东西
Socket socket = new Socket(HOST_IP, PORT);
DataInputStream in = new DataInputStream(socket.getInputStream());
byte[] buffer = new byte[256];
while ((i = in.read(buffer, 0, buffer.length)) != -1)
{
String input = new String(buffer, "UTF-8");
}