我有我的简化代码,它完全运行到最后,并将正常加入流程,为了减少您的困惑,我用更简单的部分进行了替换:
import math
import os
import numpy as np
import time
import multiprocessing
from multiprocessing import Process
from multiprocessing import Queue
import csv
def subu(remo,queue):
remlist=[[remo,4.6*remo,4.7*remo],[7.4*remo,6.5*remo,8.1*remo]]
queue.put(remlist)
return #remlist
thread_num=10
stime=time.perf_counter()
processes=[]
if __name__=="__main__":
q=Queue()
for i in range(thread_num):
proc=Process(target=subu,args=(i,q))
proc.start()
processes.append(proc)
print('afterstart')
for proc in processes:
proc.join()
lastlist=[]
print('afterjoin')
while not q.empty():
print('inqnotemp')
# time.sleep(10)
for _ in range(thread_num):
lastlist.append(q.get())
time.sleep(.1)
with open(f'lastlist.csv',"w",newline="") as f:
cw = csv.writer(f)
cw.writerows(r+[""] for r in lastlist)
ftime=time.perf_counter()
print(f'total process in{ftime-stime}s finished')
input()
但是下面的代码,我的主代码不加入进程,除了不加入进程之外,代码还可以:
import math
import os
import numpy as np
import time
import multiprocessing
from multiprocessing import Process
from multiprocessing import Queue
import csv
def dadesazlistfake11(dlist):
zlist=[]
for _ in range(11):
for i in range(len(dlist[575])):
zlist.append(dlist[575][i])
for j in range(len(columha[0])):
if(j%2==0):
zlist.append(max(dlist[i][1] for i in range(575+columha[2][j] ,575+ columha[3][j]+1)))
if(j%2==1):
zlist.append(min(dlist[i][2] for i in range(575+columha[2][j] ,575+ columha[3][j]+1)))
return zlist
def subu(remo,slist,queue):
rawlist=[]
remlist=[]
periodend=len(slist)-24
for l in range(575,periodend):
remlist.append(dadesazlistfake11(slist[l-575:l+25]))
if (l-576)%10==0:
print(f'{remo,l-576}')
ftime=time.perf_counter()
print(f'in{ftime-stime}s finished')
queue.put(remlist)
return #remlist
其余代码如下:
def splito(a,b,c,d,e):
#bazehaye entehaei nemikhahim tu loop bashan
period=np.zeros(e+1).astype(int)
period[0]= a+c-1 #575
period[e]= b-d-1 #489688
delta=int(math.floor((period[e]-period[0])/e))
for i in range(1,e):
period[i]=period[0]+i*delta
return period
thread_num=10
rawlist=np.arange(800*5).reshape(-1,5)
rawlist=list(rawlist)
colnamesdropped=['datetime','start','15m1high','15m1low','end']
times=[['15m','30m','1H','6H','1d'],[1,2,4,24,96]]
nexttimes=[['1H','6H'],[4,24]]
columha=[[],[],[],[]]
s=0
l=0
for j in range(0,len(times[0])):
for k in range(1,7):
columha[0].append(s+6)
columha[1]=1
columha[2].append(-(k-1)*times[1][j]-times[1][j]+1)
columha[3].append(-(k-1)*times[1][j])
s=s+1
columha[0].append(s+6)
columha[1]=1
columha[2].append(-(k-1)*times[1][j]-times[1][j]+1)
columha[3].append(-(k-1)*times[1][j])
s=s+1
for j in range(0,len(nexttimes[0])):
if j==0:
for k in range(1,7):
columha[0].append(s+6)
columha[1]=1
columha[2].append( +1+(k-1)*nexttimes[1][j])
columha[3].append( +1+(k-1)*nexttimes[1][j]+nexttimes[1][j]-1)
s=s+1
columha[0].append(s+6)
columha[1]=1
columha[2].append( +1+(k-1)*nexttimes[1][j])
columha[3].append( +1+(k-1)*nexttimes[1][j]+nexttimes[1][j]-1)
s=s+1
elif j==1:
k=1
columha[0].append(s+6)
columha[1]=1
columha[2].append( +1+(k-1)*nexttimes[1][j])
columha[3].append( +1+(k-1)*nexttimes[1][j]+nexttimes[1][j]-1)
s=s+1
columha[0].append(s+6)
columha[1]=1
columha[2].append( +1+(k-1)*nexttimes[1][j])
columha[3].append( +1+(k-1)*nexttimes[1][j]+nexttimes[1][j]-1)
s=s+1
主代码的最后一部分:
period=splito(0,len(rawlist),96*6,24,thread_num)
tupleinput=[tuple([i,rawlist[period[i]-575:period[i+1]+24]]) for i in range(thread_num)]
stime=time.perf_counter()
processes=[]
if __name__=="__main__":
q=Queue()
for i in range(thread_num):
proc=Process(target=subu,args=(tupleinput[i][0],tupleinput[i][1],q))
proc.start()
processes.append(proc)
# rawlist=[]
# time.sleep(0.1)
print('afterstart')
for proc in processes:
proc.join()
# print('',)
lastlist=[]
print('afterjoin')
while not q.empty():
print('inqnotemp')
# time.sleep(10)
for _ in range(thread_num):
lastlist.append(q.get())
with open(f'lastlist.csv',"w",newline="") as f:
cw = csv.writer(f)
cw.writerows(r+[""] for r in lastlist)
ftime=time.perf_counter()
print(f'total process in{ftime-stime}s finished')
input()
什么问题?我使用的是windows和pyhton 3.8,并通过运行.py保存的文件。
如果我没有错,Process.join()
会在终止进程之前等待进程完成。如果你想突然停止进程,请执行Process.terminate()