我想在下面的代码片段中合并基于进程的并发性或基于线程的并发性(包含文件读取、计算和文件写入部分)——当(I)在大量交易工具上操作时,和/或(ii)当需要包括额外的基于时间框架的信号线以写入.npy文件时,这将是至关重要的。如果可能的话。在多处理中合并多线程将是一个很好的奖励,但我很乐意单独使用任何一个。
import numpy as np
import pandas as pd
import MetaTrader5 as mt5
marketFeedArrayLength = 364320
timeFrame = mt5.TIMEFRAME_M1
symbols_MultiTFMACD = ['AUDCAD', 'AUDJPY', 'AUDNZD', 'AUDUSD', 'CADJPY', 'EURAUD', 'EURCAD']
m1_macdLine_fastEMA_period_1 = 1
m1_macdLine_slowEMA_period_480 = 480
m1_macdLine_slowEMA_period_840 = 840
def multiTF_MACD():
# Using for loop to iterate over each selected currency symbol.
for i in symbols_MultiTFMACD:
"Assign the relevant data column ('Close' price i.e. index 4) to a variable."
closeDataForEMA = np.load("%s.npy" % (i))[:,4]
"The tailored MACD data:"
m1_closeDataForEMA = np.flip(closeDataForEMA[-1::-timeFrame])
"Calculate the MACDs."
weightsFast_1 = np.exp(np.linspace(-1., 0., m1_macdLine_fastEMA_period_1))
weightsSlow_480 = np.exp(np.linspace(-1., 0., m1_macdLine_slowEMA_period_480))
weightsSlow_840 = np.exp(np.linspace(-1., 0., m1_macdLine_slowEMA_period_840))
# Normalize the weights.
weightsFast_1 /= weightsFast_1.sum()
weightsSlow_480 /= weightsSlow_480.sum()
weightsSlow_840 /= weightsSlow_840.sum()
# Third - Calculate the EMAs.
m1_fastEMA_1 = np.pad(np.convolve(weightsFast_1, m1_closeDataForEMA, mode='valid'), (m1_macdLine_fastEMA_period_1 - 1, 0))
m1_slowEMA_480 = np.pad(np.convolve(weightsSlow_480, m1_closeDataForEMA, mode='valid'), (m1_macdLine_slowEMA_period_480 - 1, 0))
m1_slowEMA_840 = np.pad(np.convolve(weightsSlow_840, m1_closeDataForEMA, mode='valid'), (m1_macdLine_slowEMA_period_840 - 1, 0))
# Get the MACD lines:
m1_macdLine_1_480 = m1_fastEMA_1 - m1_slowEMA_480
m1_macdLine_1_840 = m1_fastEMA_1 - m1_slowEMA_840
"Append the the MACD lines to columns in the .npy rate files"
# Create a file path capturing all the names.
file_path = "%s.npy" % (i)
# (i) Use numpy.load() to call the .npy rate files into Pandas.DataFrame().
npyToPandas = pd.DataFrame(np.load(file_path))
# (ii) Add the MACD lines as columns into their respective currency data frames.
npyToPandas['8'] = m1_macdLine_1_480
npyToPandas['9'] = m1_macdLine_1_840
# (iii) Covert the DF to an array and save as .npy format.
stackedFile_MultiTFMACD = np.save(i,
(pd.DataFrame(npyToPandas).to_numpy()),
allow_pickle=True, fix_imports=False)
multiTF_MACD()
Numpy和pandas已经在底层做了相当多的线程,您不太可能从尝试在此基础上做更多的事情获得多少好处。你可以在不同的进程中运行每个符号,但这很可能不是你的瓶颈。
如果你将for i in symbols_MultiTFMACD:
循环中的所有内容转换为def process_symbol(symbol):
这样的函数,你可以很容易地将其分配给具有concurrent.futures的单独进程
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as executor:
for symbol in symbols_MultiTFMACD:
executor.submit(process_symbol, symbol)
如果这不起作用,我建议您分析代码以找到真正的瓶颈,或者考虑使用像Spark这样的分布式处理框架。