如何以编程方式将文本写入Flink套接字



我的目标是将字符串"SUCCESS"发送到套接字,并让Apache Flink的数据流获取该字符串,并用单词"SUCCESS"更新本地文本文件。我已经设置了一个">消费者数据流",它监视从终端发送到本地主机上9999端口的文本。请参阅下面的代码,了解正在工作的消费者数据流代码:

package p1;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;
public class TestClientStreaming {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<String> filteredStream = textStream.filter(new FilterFunction<String>() {
public boolean filter(String value) throws Exception {
return value.equals("SUCCESS");
}
});
filteredStream.writeAsText(<FILE-PATH>, WriteMode.OVERWRITE);
env.execute("Reading Flink Stream");
}
}

这对我有效;当我在我的终端上运行"nc-l 9999"时,只有当我在终端中键入"SUCCESS"并按Enter键时,在File-PATH中找到的">消费者文本文件"才会更新到目前为止很棒

现在,我想避免使用终端,但仍然向这个套接字写入文本,这样我的"消费者数据流"就可以接收它。这个文本可以是任何文本,因为我的"消费者数据流"将过滤掉它想要的文本,也就是"成功"。

我只知道如何写入套接字的两种方法是使用终端(如前所述(或使用Producer DataStream,该数据流不断从">生产者文本文件"中读取,并调用DataStream.writeToSocket((将读取的文本写入套接字。然后,我的"消费者数据流"会接收到这些信息,并按照预期进行操作。

是否还有其他选项可以将文本写入套接字,以便数据流可以拾取此文本?我尝试使用套接字库将"SUCCESS"写入localhost:9999,但没有成功。

此图像可能有助于可视化我想要解决的问题以及我已经解决的问题:https://ibb.co/41GzNWM

感谢您抽出时间

您需要创建一个等效的nc -l,也就是套接字服务器。

示例代码为:

public class SocketWriter {
public static void main(String[] args) {
int port = 9999;
boolean stop = false;
ServerSocket serverSocket = null;

try {
serverSocket = new ServerSocket(port);
while (!stop) {
Socket echoSocket = serverSocket.accept();
PrintWriter out =
new PrintWriter(echoSocket.getOutputStream(), true);
BufferedReader in =
new BufferedReader(
new InputStreamReader(echoSocket.getInputStream()));
out.println("Hello Amazing");
// do whatever logic you want.
in.close();
out.close();
echoSocket.close();
}
}
catch (IOException e) {
e.printStackTrace();
}
finally {
try {
serverSocket.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}

最新更新