我正在尝试将Json数据发送到kafka主题中:我有这个Scala代码:
def sendMessage(sender_id: String, receiver_id: String, content: String) = {
val newMessage = new Producer(brokers = KAFKA_BROKER, key = "1", topic = "topic_2", message = {
sender_id + receiver_id + content
})
newMessage.sendMessages()
}
我想发送sender_id,receiver_id&内容作为Json数据到我的主题,但这似乎不起作用。这是我的生产者代码
class Producer(topic: String, key: String, brokers: String, message: String) {
val producer = new KafkaProducer[String, String](configuration)
private def configuration: Properties = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props
}
def sendMessages(): Unit = {
val record = new ProducerRecord[String, String](topic, key, message)
producer.send(record)
producer.close()
}
我想把内容保存为Json,以便以后从我的消费者那里自定义检索。
这不是JSON,它是一个代码块;最重要的是,JSON有键值对。
message = {
sender_id + receiver_id + content
}
您需要添加一个外部库(如json4s
(来创建用于序列化的JSON字符串和/或事例类对象。
val json = ("sender_id" -> sender_id, ...)
producer.send(new ProducerRecord(topic, compact(render(json)))