Kafka在本地设置:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
并创建了存储数据的示例测试主题:
bin/kafka-topics.sh --create --topic fortest --bootstrap-server localh
ost:9092 --replication-factor 1 --partitions 1
创建示例脚本以发送示例数据,然后从相同的测试主题中读取
import time
from kafka import KafkaProducer, KafkaConsumer
import multiprocessing
TOPIC = 'fortest'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
group_id='my-consumer-1'
)
def store_message():
for _ in range(100):
msg = b'message'
producer.send(topic=TOPIC, value=msg)
print(f'{msg} sent by Producer')
time.sleep(3)
def get_processed_message():
while True:
messages = consumer.poll(timeout_ms=5000)
if not messages:
print('wait for messsages')
time.sleep(5)
else:
print(f"Get messages: {messages.values()}")
它以连续的方式工作,如:
if __name__ == '__main__':
store_message()
get_processed_message()
但问题是,当生产者不断地发送消息,消费者不断地同时使用同一主题读取消息时,是否可以同时运行这两个函数?尝试使用多处理:
if __name__ == '__main__':
produce_initial_message = multiprocessing.Process(target=store_message)
consume_processed_message = multiprocessing.Process(target=get_processed_message)
produce_initial_message.start()
consume_processed_message.start()
但只有发送工作,consumer.poll()
在这种情况下从不返回任何值,并一直"等待"消息。如果将Consumer初始化和逻辑移动到不同的.py
脚本并在不同的终端中同时运行它们,则相同这需要如何调整才能以这种方式工作?(或者这需要除了消费者和生产者之外的一些更复杂的逻辑/额外的代理来处理?(
解决者:
- 将
store_message()
方法更改为使用无限循环,并在每次发送消息后使用producer.flush()
def store_message():
while True:
msg = b'message'
producer.send(topic=TOPIC, value=msg)
print(f'{msg} sent by Producer')
producer.flush()
time.sleep(3)
- 使用线程进行并发运行,而不是多处理:
if __name__ == '__main__':
t_producer = threading.Thread(target=store_message)
t_consumer = threading.Thread(target=get_processed_message)
t_producer.setDaemon(True)
t_consumer.setDaemon(True)
t_producer.start()
t_consumer.start()
while True:
pass
现在它按计划运行,谢谢。