尝试使用python多处理来消费消息



我无法使用以下代码消费消息。如果我直接使用consOne.startLoop(),我就可以消费。我在这里错过了什么。感谢您的帮助。

from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition
from multiprocessing import Process
import sys

idlist = []
def setConfig(bootstrapServers, groupId, autoOffsetReset):
consumerConf = {}
consumerConf['bootstrap.servers']   = bootstrapServers 
consumerConf['group.id']            = groupId
consumerConf['auto.offset.reset']   = autoOffsetReset
print(consumerConf)
return consumerConf
def createConsumer(consumerConf, topic):
consumer = Consumer(consumerConf)
consumer.subscribe([topic])
print("consumer subscribed to topic {}".format(topic))
return consumer

# self.consumer.assign([TopicPartition(topic, partition)])
def startLoop(consumer):
try:
while True:
message = consumer.poll(1.0)
if message is None:
print("none")
continue
elif message.error():
if message.error().code == KafkaError._PARTITION_EOF:
sys.stderr.write('EOF Partition - {}  '.format(message.partition()))
else:
sys.stderr.write('Consumer Error on Topic - {}  '.format(message.topic()))
sys.stderr.write('''-- topic        - {} 
-- partition    - {} 
-- offset       - {}'''.format(
message.topic(), message.partition(), message.offset()))
else:
print('Received message: {}'.format(message.value().decode('utf-8')))
handleMessage(message.value())
except KeyboardInterrupt:
sys.stderr.write('Kafka Exception raised - {}  '.format(message.topic()))
sys.exit(1)
finally:
consumer.close()
# body of the message or (message.vlue())
def handleMessage(body):
global idlist
idlist.append(body)
print(idlist)
if __name__ === '__main__':
config = setConfig('localhost:9092', groupId='group', 
autoOffsetReset='smallest')
consOne = createConsumer(config, 'test')
# consOne.startLoop() Works!
processOne = Process(target=startLoop, args=(consOne, ), group=None) 
# doesn't work :(
processOne.start()
processOne.join()

consumer = Consumer({'bootstrap.servers':'localhost:9092', 'group.id':'group', 'auto.offset.reset':'smallest'})
consumer.subscribe(['test'])
def startLoop():
try:
global consumer 
print(consumer)
while True:
message = consumer.poll(1.0)
if message is None:
print("none")
continue
elif message.error():
if message.error().code == KafkaError._PARTITION_EOF:
sys.stderr.write('EOF Partition - {}  '.format(message.partition()))
else:
sys.stderr.write('Consumer Error on Topic - {}  '.format(message.topic()))
sys.stderr.write('''-- topic        - {} 
-- partition    - {} 
-- offset       - {}'''.format(
message.topic(), message.partition(), message.offset()))
else:
print('Received message: {}'.format(message.value().decode('utf-8')))
# handleMessage(message.value())
except KeyboardInterrupt:
sys.stderr.write('Kafka Exception raised - {}  '.format(message.topic()))
sys.exit(1)
finally:
consumer.close()
if __name__ == '__main__':
processOne = Process(target=startLoop, group=None) 
# still consumes message with startLoop() but not with processOne.start()
# startLoop() 
processOne.start()
processOne.join()

可能您使用multiprocessing的方式不对。官方文件的一个例子。

确保新的Python解释器可以安全地导入主模块,而不会产生意外的副作用(例如启动新进程(。主模块的安全导入|编程指南

因此,有必要在if __name__ == '__main__':中启动一个进程。

相关内容

  • 没有找到相关文章

最新更新