从 Flink / Kafka 中的同一套接字读取和写入



美好的一天

我正在通过 flink/kafka 接收数据(流数据)。我连接的端口与我需要将消息写回的端口相同

TCP/IP ->

Flink/Kafka Consumer -> Process Data -> 将结果发送到 kafka 主题并返回到 TCP/IP 连接

// 1. Connect to TCP Stream (TCP (Socket) -> Kafka Stream INPUT)
val consumer_stream = env.socketTextStream(url, port, 'n')
// 2. Processing Data
.....
// 3. Write result to kafka topic 
consumer_stream.addSink(new FlinkKafkaProducer09[String](broker_url, topic_name, new SimpleStringSchema()))
// 4. Send result back to connected url ie.(Ref Step 1 URL) (url+port)
(This is where I need Assistance)

连接到 URL 和端口有效。我接收并处理数据我写信给这个话题现在,我还需要写回我连接到的同一 url 和端口。{因为 URL 和端口可以同时发送和接收数据}

我让它写入另一个端口

// write to Different PORT
val socket_write: DataStreamSink[String] = out_data.writeToSocket(url, diff_port, new SimpleStringSchema())

这行得通...尝试写入同一端口时出现问题。当我使用我正在读取的同一端口时...闪烁作业失败

任何想法

问候

您可以使用

自定义SinkFunction将数据写回 URL。

stream.addSink(new SinkFunction<String>() {
    // initialise the client to send the data
    public void invoke(String value) throws Exception {
        // send here.               
    }
}

或者用SocketClientSink

env.socketTextStream("localhost", 5555).map(x => { println(x); x }).addSink(new SocketClientSink[String]("localhost", 5555, new SimpleStringSchema))

相关内容

  • 没有找到相关文章

最新更新