如何将 Kafka 直接流 JSON 存储到 Cassandra 中



我必须将火花流数据保存到Cassandra中。流来自 Kafka,Kafka 消息采用 JSON 格式,如下所示。

{
  "status": "NOT_AVAILABLE",
  "itemid": "550672332",
  "qty": 0,
  "lmts": "2017-11-18T10:39:21-08:00",
  "timestamp": 1511030361000
}

我在 Spark 2.2.0 中编写了以下代码来执行此操作。

case class NliEvents(itemid: String, status: String, qty: String)
def main(args: Array[String]): Unit = {
 .....
  val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val valueStream = stream.map(_.value())
    val cassandraCrud = new CassandraOperations
    import com.datastax.spark.connector._
    val columns = SomeColumns("itemid", "status", "qty")
    val keySpace = configuration.getString(env + ".cassandra.keyspace")
    val gson = new Gson()
    import org.json4s._
    import org.json4s.jackson.JsonMethods._
    implicit val formats = DefaultFormats
    valueStream.foreachRDD((rdd, time) => {
      if (!rdd.isEmpty()) {
        val mapped = rdd.map(records => {
          val json = parse(records)
          val events = json.extract[NliEvents]
          events
        }
        )
        mapped.saveToCassandra(keySpace, "nli_events", columns)
      }
    })
}

当我运行此代码时,我得到

java.io.NotSerializableException: org.json4s.DefaultFormats$

错误。可能是我做得不正确。

你能用

下面的代码替换你的foreach语句吗?

valueStream.mapPartitions(x => {
  val lst = scala.collection.mutable.ListBuffer[NliEvents]()
  while (x.hasNext) {
    val json = parse(x.next())
    val events = json.extract[NliEvents]
    lst += events
  }
  lst.toList.iterator
  }
).saveToCassandra(keySpace, "nli_events",columns)

它应该有效。如果您遇到任何错误,请告诉我。

相关内容

  • 没有找到相关文章

最新更新