我无法使用以下代码消费消息。如果我直接使用consOne.startLoop()
,我就可以消费。我在这里错过了什么。感谢您的帮助。
from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition
from multiprocessing import Process
import sys
idlist = []
def setConfig(bootstrapServers, groupId, autoOffsetReset):
consumerConf = {}
consumerConf['bootstrap.servers'] = bootstrapServers
consumerConf['group.id'] = groupId
consumerConf['auto.offset.reset'] = autoOffsetReset
print(consumerConf)
return consumerConf
def createConsumer(consumerConf, topic):
consumer = Consumer(consumerConf)
consumer.subscribe([topic])
print("consumer subscribed to topic {}".format(topic))
return consumer
# self.consumer.assign([TopicPartition(topic, partition)])
def startLoop(consumer):
try:
while True:
message = consumer.poll(1.0)
if message is None:
print("none")
continue
elif message.error():
if message.error().code == KafkaError._PARTITION_EOF:
sys.stderr.write('EOF Partition - {} '.format(message.partition()))
else:
sys.stderr.write('Consumer Error on Topic - {} '.format(message.topic()))
sys.stderr.write('''-- topic - {}
-- partition - {}
-- offset - {}'''.format(
message.topic(), message.partition(), message.offset()))
else:
print('Received message: {}'.format(message.value().decode('utf-8')))
handleMessage(message.value())
except KeyboardInterrupt:
sys.stderr.write('Kafka Exception raised - {} '.format(message.topic()))
sys.exit(1)
finally:
consumer.close()
# body of the message or (message.vlue())
def handleMessage(body):
global idlist
idlist.append(body)
print(idlist)
if __name__ === '__main__':
config = setConfig('localhost:9092', groupId='group',
autoOffsetReset='smallest')
consOne = createConsumer(config, 'test')
# consOne.startLoop() Works!
processOne = Process(target=startLoop, args=(consOne, ), group=None)
# doesn't work :(
processOne.start()
processOne.join()
consumer = Consumer({'bootstrap.servers':'localhost:9092', 'group.id':'group', 'auto.offset.reset':'smallest'})
consumer.subscribe(['test'])
def startLoop():
try:
global consumer
print(consumer)
while True:
message = consumer.poll(1.0)
if message is None:
print("none")
continue
elif message.error():
if message.error().code == KafkaError._PARTITION_EOF:
sys.stderr.write('EOF Partition - {} '.format(message.partition()))
else:
sys.stderr.write('Consumer Error on Topic - {} '.format(message.topic()))
sys.stderr.write('''-- topic - {}
-- partition - {}
-- offset - {}'''.format(
message.topic(), message.partition(), message.offset()))
else:
print('Received message: {}'.format(message.value().decode('utf-8')))
# handleMessage(message.value())
except KeyboardInterrupt:
sys.stderr.write('Kafka Exception raised - {} '.format(message.topic()))
sys.exit(1)
finally:
consumer.close()
if __name__ == '__main__':
processOne = Process(target=startLoop, group=None)
# still consumes message with startLoop() but not with processOne.start()
# startLoop()
processOne.start()
processOne.join()
可能您使用multiprocessing
的方式不对。官方文件的一个例子。
确保新的Python解释器可以安全地导入主模块,而不会产生意外的副作用(例如启动新进程(。主模块的安全导入|编程指南
因此,有必要在if __name__ == '__main__':
中启动一个进程。