我如何从confluent_kafka库发起对producer.poll()的重试?



我刚开始使用kafka,不确定我是否理解它。然而,我的任务是确保它会重试,如果有任何类型的错误。例如,如果我收到的响应不是200,我已经用API完成了这一点,但是我被kafka的东西难住了。下面是我的代码,如果有帮助的话:

def send_kafka_message(payload, secrets):
"""Parses payload into message and sets on kafka producer."""
logger.info("Sending message on Kafka topic...")
producer = Producer(
{
"bootstrap.servers": CONFLUENT_SERVER,
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": CONFLUENT_USER,
"sasl.password": secrets["confluent_secret"],
"error_cb": error_cb,
}
)
producer.produce(
ORDER_TOPIC,
value=json.dumps(payload),
callback=delivery_report,
partition=abs(hash(payload["_id"])) % 6,
)
producer.poll()

默认retries金额为2147483647

Description:重试发送失败的Message的次数。注意:重试可能导致重新排序,除非enable.idempotence设置为true

你可以在这里找到所有的librdkafka设置,这些设置是用于confluence -kafka-python的。

最新更新