多重处理中分配的返回值



我在开发一个candlestick解释器时遇到了一个问题。软件迭代作为货币的行,并在每个行内启动流程,以检查不同时间段上的特定烛台模式。

points是一个字典,键是字符串格式的时间框架,值是它排名的点,这些信息也在最后呈现到文件中。

我遇到的问题是它看起来像";bdd";始终为空,因此不会将任何值写入点字典。

import talib
import yfinance as yf
import operator
from multiprocessing import Queue, Process
from datetime import datetime, timedelta

def Fetch(bdd, timeframe, line, y1d, y1wk, points):
if(timeframe == "15m"):
data_15m = yf.download(line, start=df(y1d), interval="15m", threads=1)
if data_15m.empty is False:
points["15m"] = GetMarketPoints(data_15m, line)
bdd["15m"] = (points["15m"], data_15m)
#queue.put((points["15m"], data_15m))
#return (points["15m"], data_15m)
elif(timeframe == "30m"):
data_30m = yf.download(line, start=df(y1d), interval="30m", threads=1)
if data_30m.empty is False:
points["30m"] = GetMarketPoints(data_30m, line)
bdd["30m"] = (points["30m"], data_30m)
#queue.put((points["30m"], data_30m))
#return (points["30m"], data_30m)     
elif(timeframe == "1h"):
data_1h = yf.download(line, start=df(y1d), interval="1h", threads=1)
if data_1h.empty is False:
points["1h"] = GetMarketPoints(data_1h, line)
bdd["1h"] = (points["1h"], data_1h)
#queue.put((points["1h"], data_1h))
#return (points["1h"], data_1h)
elif(timeframe == "1d"):
data_1d = yf.download(line, start=df(y1d), interval="1d", threads=1)
if data_1d.empty is False:
points["1d"] = GetMarketPoints(data_1d, line)
bdd["1d"] = (points["1d"], data_1d)
#queue.put((points["1d"], data_1d))
#return (points["1d"], data_1d)
else:
data_1wk = yf.download(line, start=df(y1wk), interval="1wk", threads=1)
if data_1wk.empty is False:
points["1wk"] = GetMarketPoints(data_1wk, line)
bdd["1wk"] = (points["1wk"], data_1wk)
#queue.put((points["1wk"], data_1wk))
#return (points["1wk"], data_1wk)
#Dateformat
def df(date):
return date.strftime('%Y-%m-%d')
def GetMarketPoints(data, line):
point = 0
MorningDojiStar = talib.CDLMORNINGDOJISTAR(data['Open'], data['High'], data['Low'], data['Close'])
data['Morning Doji Star'] = MorningDojiStar
morningdojistar_days = data[data['Morning Doji Star'] > 0]
if (morningdojistar_days.empty is False):
point += 1
print(line + " Morning Doji Star")
if data[data['Morning Doji Star'] < 0].empty is False:
print("Negative pattern. Ignores Points")
point -= 1
Harami = talib.CDLHARAMI(data['Open'], data['High'], data['Low'], data['Close'])
data['Harami'] = Harami
harami_days = data[data['Harami'] > 0]
if (harami_days.empty is False):
point += 1
print(line + " Harami")
if data[data['Harami'] < 0].empty is False:
print("Negative pattern. Ignores Points")
point -= 1
Piercing = talib.CDLPIERCING(data['Open'], data['High'], data['Low'], data['Close'])
data['Piercing'] = Piercing
piercing_days = data[data['Piercing'] > 0]
if (piercing_days.empty is False):
point += 1
print(line + " Piercing")
if data[data['Piercing'] < 0].empty is False:
print("Negative pattern. Ignores Points")
point -= 1
MorningStar = talib.CDLMORNINGSTAR(data['Open'], data['High'], data['Low'], data['Close'])
data['Morning Star'] = MorningStar
morningstar_days = data[data['Morning Star'] > 0]
if (morningstar_days.empty is False):
point += 1
print(line + " Morning Star")
if data[data['Morning Star'] < 0].empty is False:
print("Negative pattern. Ignores Points")
point -= 1
Engulfing = talib.CDLENGULFING(data['Open'], data['High'], data['Low'], data['Close'])
data['Engulfing'] = Engulfing
engulfing_days = data[data['Engulfing'] > 0]
if (engulfing_days.empty is False):
point += 1
print(line + " Engulfing")
if data[data['Engulfing'] < 0].empty is False:
print("Negative pattern. Ignores Points")
point -= 1
Hammer = talib.CDLHAMMER(data['Open'], data['High'], data['Low'], data['Close'])
data['Hammer'] = Hammer
hammer_days = data[data['Hammer'] > 0]
if (hammer_days.empty is False):
point += 1
print(line + " Hammer")
if data[data['Hammer'] < 0].empty is False:
print("Negative pattern. Ignores Points")
point -= 1
return point
def TheBigFunction():
#Point Weights
pointweights = {
"15m": 0.25,
"30m": 0.5,
"1h": 1,
"1d": 24,
"1wk": 168
}
bdd = {}
queue = Queue()
f = open("crypto.txt","r")
lines = f.readlines()
to_buy = {}
THRESHOLD = 2
presentday = datetime.now() # or presentday = datetime.today()
jobs = []
y1d = presentday - timedelta(days=1)
y1wk = presentday - timedelta(days=7)
# Write File
day = '{:%d-%m-%Y}'.format(datetime.today())
for line in lines:

print(line + "n")
#Initialize to Zero
totalPoints = 0
points = {
"15m": 0,
"30m": 0,
"1h": 0,
"1d": 0,
"1wk": 0
}
timeframes = ["15m", "30m", "1h", "1d", "1wk"]
dataframes = {}
for key in points.keys():
print("q2" + str(queue))

print("bdd" +  str(bdd))
proc = Process(target=Fetch, args=(bdd, key, line, y1d, y1wk, points))
jobs.append(proc)
proc.start()
for proc in jobs:
proc.join()    
print("q2" + str(queue))

print(bdd.values())
for i, returnValue in enumerate(bdd):
points[timeframes[i]], dataframes[timeframes[i]] = returnValue
totalPoints += points[timeframes[i]] * pointweights[timeframes[i]]
if totalPoints > THRESHOLD:
filename = str(day) +".txt"
f = open(filename, "a+")
f.write("TICKER: " + line + " Points: " + str(totalPoints) + "n")
f.write("15M: " + str(points["15m"]) + "n")
f.write("30M: " + str(points["30m"]) + "n")
f.write("1H: " + str(points["1h"]) + "n")
f.write("1D: " + str(points["1d"]) + "n")
f.write("1wk: " + str(points["1wk"]) + "n")
f.write("n")
f.close()
print(f);
if __name__ == '__main__':
TheBigFunction()

进程不共享变量。Process()不发送bcc作为参考,而仅发送值(使用文件pickle(。所以每个进程都有自己的bcc,如果你在Fetch中设置bcc中的值,那么它就不能改变主进程中的bcc。您必须使用queue将结果发送回主进程。并且需要将queue发送到Fetch

from multiprocessing import Queue, Process
def fetch(value, queue):  # PEP8: `lower_case_names` for functions
queue.put( (value, "some data") )
# --- main ---
points = {
"15m": 0,
"30m": 0,
"1h": 0,
"1d": 0,
"1wk": 0
}
q = Queue()
jobs = []
for key in points.keys():
# send `queue` to `fetch`
proc = Process(target=fetch, args=(key, q))
jobs.append(proc)
proc.start()
for proc in jobs:
proc.join()    
# get all results from processes
while not q.empty():    
result = q.get()
print(result)

BTW:

jobs = []应该在for line-循环内,但你有它在for line-循环外

作为@furas:提出的解决方案的替代方案

您可以传递管理的字典,而不是常规字典:

from multiprocessing import Manager
...
def TheBigFunction():
...
manager = Manager()
bdd = manager.dict() # a managed dictionary
... # etc.

变量bdd现在是一个代理对象。对bdd的方法调用或多或少相当于对执行manager.Manager()语句时启动的服务器进程的远程方法调用。底层通信机制是套接字(Linux(或命名管道(Windows(。因此,性能显然与处理常规字典时不同,然而,常规字典在您的情况下不起作用。

相关内容

  • 没有找到相关文章

最新更新