我有一个应用程序,它应该向Kafka发送有限数量的消息,然后退出。出于某种原因,即使我关闭了制片人,卡夫卡的联系仍然存在。我的实现(在Scala中)或多或少是
object Kafka {
private val props = new Properties()
props.put("compression.codec", DefaultCompressionCodec.codec.toString)
props.put("producer.type", "sync")
props.put("metadata.broker.list", "localhost:9092")
props.put("batch.num.messages", "200")
props.put("message.send.max.retries", "3")
props.put("request.required.acks", "-1")
props.put("client.id", "myclient")
private val producer = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(props))
private def encode(msg: Message) = new KeyedMessage("topic", msg.id.getBytes, write(msg).getBytes)
def send(msg: Message) = Try(producer.send(encode(msg)))
def close() = producer.close()
}
这里Message
是一个简单的case类,我如何将它转换为字节数组实际上并不相关。
消息确实到达了,但当我最终调用Kafka.close()
时,应用程序不会退出,连接似乎也不会释放。
有没有办法明确要求卡夫卡终止连接?
def close()=生产者.close()
这将创建一个名为"close"的函数,该函数调用producer.close()我没有看到任何证据表明你的代码实际上关闭了生产者。
你只需要打电话:生产商关闭