Python生成不同的Kafka分区



我正试图通过使用经典的Twitter流媒体示例来学习Kafka。我正试图使用我的生产者流式传输基于2个过滤器的twitter数据到同一主题的不同分区。例如,tracks='Google'到一个分区,tracks='Apple'到另一个分区的twitter数据。

class Producer(StreamListener):
def __init__(self, producer):
self.producer = producer
def on_data(self, data):
self.producer.send(topic_name, value=data)
return True
def on_error(self, error):
print(error)

twitter_stream = Stream(auth, Producer(producer))
twitter_stream.filter(track=["Google"])

如何添加另一个音轨并将数据流传输到另一个分区。

同样,我如何让我的消费者从一个特定的分区消费。

consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms =  5000,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))

经过一些研究,我能够解决这个问题:

在生产者端,指定分区:

self.producer.send(topic_name, value=data,partition=0)

在消费者端,

consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms =  5000,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
consumer.assign([TopicPartition('trial', 0)])

Kafka在消息的密钥上对数据进行分区。在给定的代码中,您只将value传递给Producer消息,因此键将为null,因此将在所有分区之间进行循环。

请参阅Kafka库的文档,了解如何为每条消息提供密钥

相关内容

  • 没有找到相关文章

最新更新