谁能分享一个Scala中的Flink Kafka示例?



谁能分享一个Scala中Flink Kafka(主要接收来自Kafka的消息)的工作示例?我知道在Spark中有一个KafkaWordCount的例子。我只需要在Flink中打印出Kafka的信息。这真的很有帮助。

下面的代码展示了如何使用Flink的Scala DataStream API从Kafka主题中读取数据:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    val stream = env
      .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
      .print
    env.execute("Flink Kafka Example")
  }
}

与Robert添加的内容相反,下面是一段用于向Kafka主题发送消息的应用程序代码。

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaProducer {
  def main(args: Array[String]): Unit = {
    KafkaProducer.sendMessageToKafkaTopic("localhost:9092", "topic_name")
  }    
  def sendMessageToKafkaTopic(server: String, topic:String): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", servers)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String,String](props)
    val record = new ProducerRecord[String,String](topic, "HELLO WORLD!")
    producer.send(record)
    producer.close()
  }
}

相关内容

  • 没有找到相关文章

最新更新