我有一个使用字典中的值循环的函数。我想拆分我的字典键,这样我就可以在等于我的 cpu 的部分中断我的字典。我的功能是:
def find_something2(new2, threl=2.0, my_limit=150, far=365):
""" Find stocks tha are worth buying"""
global current_date, total_money, min_date, current_name, dates_dict, mylist, min_date_sell, reduced_stocks
worthing = list()
for stock in new2:
frame = reduced_stocks[stock]
temp = frame.loc[current_date:end_date]
if not temp.empty:
mydate = temp.head(far).Low.idxmin()
if mydate <= min_date_sell:
my_min = temp.head(far).Low.min()
if total_money >= my_min > 0: # find the min date at four months
ans, res, when_sell, total, income = worth_buy(stock, frame, mydate, 'Low',
thres=threl, sell_limit=my_limit)
if ans:
if income > 3 * 10 ** 6:
worthing.append([mydate, stock, res, when_sell, total, income])
if current_date > '1990-01-01':
return sorted(worthing, key=itemgetter(0))
elif current_date > '1985-01-01':
return sorted(worthing, key=itemgetter(0))
else:
answer = sorted(worthing, key=itemgetter(5), reverse=True)
return answer[::11]
所以我尝试的是:
import multiprocessing as mp
result_list = []
def log_result(result):
# This is called whenever foo_pool(i) returns a result.
# result_list is modified only by the main process, not the pool workers.
global result_list
result_list.append(result)
def apply_async_with_callback():
global reduced_stocks
temp = list(reduced_stocks.keys())
temp1 = temp[0:1991]
temp2 = temp[1991:]
temp = [temp1, temp2]
pool = mp.Pool(2)
for i in temp:
pool.apply_async(find_something2, args=(i, 1.1, 2200, 1,), callback=log_result)
pool.close()
pool.join()
print(result_list)
if __name__ == '__main__':
apply_async_with_callback()
这是正确的方法吗?
我也尝试过线程,但 cpu 在 15% 时使用 12 个线程(我有 6 个英特尔内核)
def pare():
relist = list(reduced_stocks.keys())
sublist = [relist[x:x+332] for x in range(0, len(relist), 332)]
data = [x for x in sublist]
threads = list()
from threading import Thread
for i in range(12):
process = Thread(target=find_something2, args=(1.4,2500,8,data[i],i,results))
process.start()
threads.append(process)
for process in threads:
process.join()
进行多处理的一种方法是创建一个Pool
并将准备好的数据传递给它。等待计算完成并处理结果。该代码建议如何做到这一点。
# setup the function so it gets everything from arguments
def find_something2(new2, threl, my_limit, far, current_date, total_money, min_date_sell, reduced_stocks, end_date):
# ....
pass
# prepare the data
# replace the a1, a2 ... with the actual parameters your function takes
data = [(a1, a2, a3, ...) for your_data in your_dict]
import multiprocessing as mp
with mp.Pool() as pool:
results = pool.starmap(find_something2, data)
print(results)