我正在尝试使用涉及连接的Kafka Streams应用程序对Kafka主题中的数据进行反向处理。其中一个要加入的流在相应的主题中具有单位时间内更大的数据量。我想控制来自各个主题的消费,以便从单个consumer.poll()
中的每个主题获得大致相同的事件时间戳。然而,似乎没有任何方法来控制支持源流的KafkaConsumer
的行为。还有别的办法吗?
目前Kafka无法同时控制生产者和消费者的速率限制。
参考:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-13 + - +配额但是如果你使用Apache Spark作为流处理平台,你可以限制Kafka接收器的输入速率
在消费者端可以使用consume([num_messages=1][, timeout=-1])
函数代替poll.
消耗([num_messages = 1][,超时= 1]):使用消息列表(超时时可能为空)。回调可能作为调用此方法的副作用而执行。应用程序必须检查返回的Message对象的Message.error()方法,以区分列表中每个Message的正确消息(error()返回None)和错误(详情请参阅error().code())。如果enable.partition.eof配置属性设置为True,则分区EOF事件也将作为消息公开,error().code()设置为_PARTITION_EOF。
- num_messages (int) -返回的最大消息数(默认为1)。
- timeout (float) -阻塞等待消息,事件或回调的最大时间(默认:infinite(-1))。(秒)