python multiprocessing work with hazelcast



我还有一个无法解决的问题(((我在Hazelcast上有一个群集,我想在每个单独的子过程中创建一个Hazelcast客户端。

#test py
import os
import logging
# pip install hazelcast-client-python
import hazelcast 
from Customer import Customer
from datetime import datetime
from multiprocessing import Process, Pool
print "starting..."
# basic logging setup to see client logs
logging.basicConfig()
# logging.getLogger().setLevel(logging.DEBUG)

def queue_parse_new():
    config = hazelcast.ClientConfig()
    portable_factory = {Customer.CLASS_ID: Customer}
    config.serialization_config.add_portable_factory(Customer.FACTORY_ID, portable_factory)
    # Hazelcast.Address is the hostname or IP address, e.g. 'localhost:5701'
    config.network_config.addresses.append('192.168.200.245:5701')
    print "retrieving client"
    client = hazelcast.HazelcastClient(config)
    print "client is retrieved"
    queue = client.get_queue("customers").blocking()
    while not queue.is_empty():
        list_of_dto = []
        queue.drain_to(list_of_dto, 10000)
        for i in list_of_dto:
            print i.foo + ' ' + str(os.getpid())
    else:
        client.shutdown()

if __name__ == '__main__':
    proc = Process(queue_parse())
    proc1 = Process(queue_parse_new())
    proc2 = Process(queue_parse())
    proc.start()
    proc1.start()
    proc2.start()

我修改了您的示例,并且有效。我使用整数代替您的客户对象。

import multiprocessing
import logging
import hazelcast
from multiprocessing import Process

def fill_data():
    config = hazelcast.ClientConfig()
    config.network_config.addresses.append('127.0.0.1:5701')
    client = hazelcast.HazelcastClient(config)
    queue = client.get_queue("customers").blocking()
    lst = [i for i in range(0, 10000)]
    for i in range(0, 10):
        queue.add_all(lst)
    print "test data ready", queue.size()
    client.shutdown()

def queue_parse():
    config = hazelcast.ClientConfig()
    config.network_config.addresses.append('127.0.0.1:5701')
    current_process = multiprocessing.current_process()
    client = hazelcast.HazelcastClient(config)
    queue = client.get_queue("customers").blocking()
    while not queue.is_empty():
        list_of_dto = []
        queue.drain_to(list_of_dto, 10000)
        print "drained batch with size:", len(list_of_dto), "from process:", current_process.name
    else:
        client.shutdown()
if __name__ == '__main__':
    print "starting..."
    # basic logging setup to see client logs
    logging.basicConfig()
    # logging.getLogger().setLevel(logging.DEBUG)
    fill_data()
    proc1 = Process(target=queue_parse, name="process-1")
    proc2 = Process(target=queue_parse, name="process-2")
    proc3 = Process(target=queue_parse, name="process-3")
    proc1.start()
    proc2.start()
    proc3.start()

代码中最重要的更改是您如何创建过程。

proc1 = Process(target=queue_parse, name="process-1")

i不带括号的目标和一个从日志中使用的过程名称。

相关内容

  • 没有找到相关文章

最新更新