美好的一天
我正在通过 flink/kafka 接收数据(流数据)。我连接的端口与我需要将消息写回的端口相同
TCP/IP ->Flink/Kafka Consumer -> Process Data -> 将结果发送到 kafka 主题并返回到 TCP/IP 连接
// 1. Connect to TCP Stream (TCP (Socket) -> Kafka Stream INPUT)
val consumer_stream = env.socketTextStream(url, port, 'n')
// 2. Processing Data
.....
// 3. Write result to kafka topic
consumer_stream.addSink(new FlinkKafkaProducer09[String](broker_url, topic_name, new SimpleStringSchema()))
// 4. Send result back to connected url ie.(Ref Step 1 URL) (url+port)
(This is where I need Assistance)
连接到 URL 和端口有效。我接收并处理数据我写信给这个话题现在,我还需要写回我连接到的同一 url 和端口。{因为 URL 和端口可以同时发送和接收数据}
我让它写入另一个端口
// write to Different PORT
val socket_write: DataStreamSink[String] = out_data.writeToSocket(url, diff_port, new SimpleStringSchema())
这行得通...尝试写入同一端口时出现问题。当我使用我正在读取的同一端口时...闪烁作业失败
任何想法
问候
您可以使用
自定义SinkFunction
将数据写回 URL。
stream.addSink(new SinkFunction<String>() {
// initialise the client to send the data
public void invoke(String value) throws Exception {
// send here.
}
}
或者用SocketClientSink
env.socketTextStream("localhost", 5555).map(x => { println(x); x }).addSink(new SocketClientSink[String]("localhost", 5555, new SimpleStringSchema))