kafka-python:通过运行并发进程/脚本,同时生成和使用来自同一主题的消息



Kafka在本地设置:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

并创建了存储数据的示例测试主题:

bin/kafka-topics.sh --create --topic fortest --bootstrap-server localh
ost:9092 --replication-factor 1 --partitions 1

创建示例脚本以发送示例数据,然后从相同的测试主题中读取

import time
from kafka import KafkaProducer, KafkaConsumer
import multiprocessing
TOPIC = 'fortest'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
group_id='my-consumer-1'
)
def store_message():
for _ in range(100):
msg = b'message'
producer.send(topic=TOPIC, value=msg)
print(f'{msg} sent by Producer')
time.sleep(3)
def get_processed_message():
while True:
messages = consumer.poll(timeout_ms=5000)
if not messages:
print('wait for messsages')
time.sleep(5)
else:
print(f"Get messages: {messages.values()}")

它以连续的方式工作,如:

if __name__ == '__main__':
store_message()
get_processed_message()

但问题是,当生产者不断地发送消息,消费者不断地同时使用同一主题读取消息时,是否可以同时运行这两个函数?尝试使用多处理:

if __name__ == '__main__':
produce_initial_message = multiprocessing.Process(target=store_message)
consume_processed_message = multiprocessing.Process(target=get_processed_message)
produce_initial_message.start()
consume_processed_message.start()

但只有发送工作,consumer.poll()在这种情况下从不返回任何值,并一直"等待"消息。如果将Consumer初始化和逻辑移动到不同的.py脚本并在不同的终端中同时运行它们,则相同这需要如何调整才能以这种方式工作?(或者这需要除了消费者和生产者之外的一些更复杂的逻辑/额外的代理来处理?(

解决者:

  1. store_message()方法更改为使用无限循环,并在每次发送消息后使用producer.flush()
def store_message():
while True:
msg = b'message'
producer.send(topic=TOPIC, value=msg)
print(f'{msg} sent by Producer')
producer.flush()
time.sleep(3)
  1. 使用线程进行并发运行,而不是多处理:
if __name__ == '__main__':
t_producer = threading.Thread(target=store_message)
t_consumer = threading.Thread(target=get_processed_message)
t_producer.setDaemon(True)
t_consumer.setDaemon(True)
t_producer.start()
t_consumer.start()
while True:
pass

现在它按计划运行,谢谢。

最新更新