我还有一个无法解决的问题(((我在Hazelcast上有一个群集,我想在每个单独的子过程中创建一个Hazelcast客户端。
#test py
import os
import logging
# pip install hazelcast-client-python
import hazelcast
from Customer import Customer
from datetime import datetime
from multiprocessing import Process, Pool
print "starting..."
# basic logging setup to see client logs
logging.basicConfig()
# logging.getLogger().setLevel(logging.DEBUG)
def queue_parse_new():
config = hazelcast.ClientConfig()
portable_factory = {Customer.CLASS_ID: Customer}
config.serialization_config.add_portable_factory(Customer.FACTORY_ID, portable_factory)
# Hazelcast.Address is the hostname or IP address, e.g. 'localhost:5701'
config.network_config.addresses.append('192.168.200.245:5701')
print "retrieving client"
client = hazelcast.HazelcastClient(config)
print "client is retrieved"
queue = client.get_queue("customers").blocking()
while not queue.is_empty():
list_of_dto = []
queue.drain_to(list_of_dto, 10000)
for i in list_of_dto:
print i.foo + ' ' + str(os.getpid())
else:
client.shutdown()
if __name__ == '__main__':
proc = Process(queue_parse())
proc1 = Process(queue_parse_new())
proc2 = Process(queue_parse())
proc.start()
proc1.start()
proc2.start()
我修改了您的示例,并且有效。我使用整数代替您的客户对象。
import multiprocessing
import logging
import hazelcast
from multiprocessing import Process
def fill_data():
config = hazelcast.ClientConfig()
config.network_config.addresses.append('127.0.0.1:5701')
client = hazelcast.HazelcastClient(config)
queue = client.get_queue("customers").blocking()
lst = [i for i in range(0, 10000)]
for i in range(0, 10):
queue.add_all(lst)
print "test data ready", queue.size()
client.shutdown()
def queue_parse():
config = hazelcast.ClientConfig()
config.network_config.addresses.append('127.0.0.1:5701')
current_process = multiprocessing.current_process()
client = hazelcast.HazelcastClient(config)
queue = client.get_queue("customers").blocking()
while not queue.is_empty():
list_of_dto = []
queue.drain_to(list_of_dto, 10000)
print "drained batch with size:", len(list_of_dto), "from process:", current_process.name
else:
client.shutdown()
if __name__ == '__main__':
print "starting..."
# basic logging setup to see client logs
logging.basicConfig()
# logging.getLogger().setLevel(logging.DEBUG)
fill_data()
proc1 = Process(target=queue_parse, name="process-1")
proc2 = Process(target=queue_parse, name="process-2")
proc3 = Process(target=queue_parse, name="process-3")
proc1.start()
proc2.start()
proc3.start()
代码中最重要的更改是您如何创建过程。
proc1 = Process(target=queue_parse, name="process-1")
i不带括号的目标和一个从日志中使用的过程名称。