Spark Streaming Kafka to ES



我有一个火花流作业,它将通过Http请求从kafka读取并写入elastic。

我想验证Kafka的每个请求,并根据业务需要更改负载,然后写入Elastic Search。

我已经使用ES Http请求将数据推送到Elastic Search中。有人能指导我如何通过数据帧将数据写入ES吗?

代码段:

val dfInput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("group.id", sourceTopicGroupId)
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
import spark.implicits._
val resultDf = dfInput
.withColumn("value", $"value".cast("string"))
.select("value")
resultDf.writeStream.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Row): Unit = {
processEventsData(value.get(0).asInstanceOf[String], deviceIndex, msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
}
override def close(errorOrNull: Throwable): Unit = {
}
}).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination() //"1 second"
}

这样我们就无法实现性能。

有什么办法吗?

  • Spark版本2.3.2
  • 卡夫卡分区20
  • ES版本7.7.0

您可以使用elasticsearch-spark-20_2.11,它非常简单,有关更多信息,请访问hadoop

EsSpark.saveJsonToEs(rdd, index, conf)

最新更新