我有一个程序可以模拟整个棒球赛季,但每场比赛要做很多计算,所以每场比赛大约需要 30 秒才能运行。一个赛季有 2430 场比赛,每个赛季大约需要 20 个小时才能运行。显然,我想加快速度,因此最直接的解决方案似乎是多处理。我可以手动将其分成 ~600 个组并运行四个进程,但我想弄清楚多处理模块是如何工作的。
这是我到目前为止尝试过的,但显然它不起作用。
def test_func():
algorithm_selection = 1
# Create sqlite database connection
conn = sqlite3.connect('C:/F5 Prediction Engine/sqlite3/Version 2/statcast_db.db')
c = conn.cursor()
season = input('Year to simulate: ')
c.execute('SELECT * FROM gamelogs_' + season)
season_games = c.fetchall()
game_num = 0
for game in season_games:
game_num = game_num + 1
#Get away lineup in terms of MLB IDs
away_lineup = ConvertLineup(game[105], game[108], game[111], game[114], game[117], game[120], game[123], game[126], game[129])
#Get home lineup in terms of MLB IDs
home_lineup = ConvertLineup(game[132], game[135], game[138], game[141], game[144], game[147], game[150], game[153], game[156])
#Get away starting pitcher and hand in terms of MLB ID
away_pitcher_results = GetPitcherIDandHand(game[101])
away_pitcher_id = away_pitcher_results[0][0]
away_pitcher_hand = away_pitcher_results[0][1]
#Get home starting pitcher and hand in terms of MLB ID
home_pitcher_results = GetPitcherIDandHand(game[103])
home_pitcher_id = home_pitcher_results[0][0]
home_pitcher_hand = home_pitcher_results[0][1]
#Get the date of the game
today_date = game[0]
if algorithm_selection == 1:
#Check if the current game has already been evaluated and entered into the database
c.execute('SELECT * FROM pemstein_results_' + season + ' WHERE date = "' + game[0] + '" AND away_team = "' + game[3] + '" AND home_team = "' + game[6] +
'" AND away_team_score = "' + game[9] + '" AND home_team_score = "' + game[10] + '"')
check_results = c.fetchall()
if len(check_results) == 0:
exp_slgs = PemsteinSimulation(home_pitcher_id, away_pitcher_id, season, home_pitcher_hand, away_pitcher_hand, home_lineup, away_lineup, game[0])
if exp_slgs[2] == 0: #if both pitches had at least 300 PAs to use for simulation
c.execute([long string to insert results into database])
conn.commit()
print('Game ' + str(game_num) + ' finished.')
if exp_slgs[2] == 1: #if one of the pitches did not have enough PAs to qualify
c.execute([long string to insert results into database])
conn.commit()
print('Game ' + str(game_num) + ' finished.')
if len(check_results) > 0:
print('Game ' + str(game_num) + ' has already been evaluated.')
from multiprocessing import Process
import os
processes = []
for i in range(0, os.cpu_count()):
print('Registering process %d' % i)
processes.append(Process(target=test))
for process in processes:
process.start()
for process in processes:
process.join()
====
==============编辑:新代码
#Child Process
def simulate_games(games_list, counter, lock):
while(1):
# Create sqlite database connection
conn = sqlite3.connect('C:/F5 Prediction Engine/sqlite3/Version 2/statcast_db.db')
c = conn.cursor()
#acquire the lock which grants access to the shared variable
with lock:
#check the termination condition
if counter >= len(games_list):
break
#get the game_num and game to simulate
game_num = counter.value
game_to_simulate = game_list[counter.value]
#update the counter for the next process
counter.value += 1
#Do simulation
game_num = 0
game_num = game_num + 1
#Get away lineup in terms of MLB IDs
away_lineup = ConvertLineup(game_to_simulate[105], game_to_simulate[108], game_to_simulate[111], game_to_simulate[114], game_to_simulate[117], game_to_simulate[120], game_to_simulate[123], game_to_simulate[126], game_to_simulate[129])
#Get home lineup in terms of MLB IDs
home_lineup = ConvertLineup(game_to_simulate[132], game_to_simulate[135], game_to_simulate[138], game_to_simulate[141], game_to_simulate[144], game_to_simulate[147], game_to_simulate[150], game_to_simulate[153], game_to_simulate[156])
#Get away starting pitcher and hand in terms of MLB ID
away_pitcher_results = GetPitcherIDandHand(game[101])
away_pitcher_id = away_pitcher_results[0][0]
away_pitcher_hand = away_pitcher_results[0][1]
#Get home starting pitcher and hand in terms of MLB ID
home_pitcher_results = GetPitcherIDandHand(game[103])
home_pitcher_id = home_pitcher_results[0][0]
home_pitcher_hand = home_pitcher_results[0][1]
#Get the date of the game
today_date = game_to_simulate[0]
if algorithm_selection == 1:
#Check if the current game has already been evaluated and entered into the database
c.execute('SELECT * FROM pemstein_results_' + season + ' WHERE date = "' + game_to_simulate[0] + '" AND away_team = "' + game_to_simulate[3] + '" AND home_team = "' + game_to_simulate[6] +
'" AND away_team_score = "' + game_to_simulate[9] + '" AND home_team_score = "' + game_to_simulate[10] + '"')
check_results = c.fetchall()
if len(check_results) == 0:
exp_slgs = PemsteinSimulation(home_pitcher_id, away_pitcher_id, season, home_pitcher_hand, away_pitcher_hand, home_lineup, away_lineup, game_to_simulate[0])
if exp_slgs[2] == 0: #if both pitches had at least 300 PAs to use for simulation
c.execute('long sql')
conn.commit()
print('Game ' + str(game_num) + ' finished.')
if exp_slgs[2] == 1: #if one of the pitches did not have enough PAs to qualify
c.execute('long sql')
conn.commit()
print('Game ' + str(game_num) + ' finished.')
if len(check_results) > 0:
print('Game ' + str(game_num) + ' has already been evaluated.')
if __name__ == "__main__":
# Create sqlite database connection
conn = sqlite3.connect('C:/F5 Prediction Engine/sqlite3/Version 2/statcast_db.db')
c = conn.cursor()
#Query all games for season to be simulated
season = int(input('Year to simulate: '))
c.execute('SELECT * FROM gamelogs_' + str(season))
season_games = c.fetchall()
algorithmSelection = 1
if algorithmSelection == 1:
PemsteinSQLresults(str(season))
counter = mp.Value('i', 0)
lock = mp.Lock()
children = []
for i in range(os.cpu_count()):
children.append(mp.Process(target=simulate_games, args=(season_games, counter, lock)))
for child in children:
child.start()
for child in children:
child.join()
错误:
Traceback (most recent call last):
File "C:F5 Prediction EngineVersion 2SimulateSeason v2.py", line 126, in <module>
child.start()
File "C:Pythonlibmultiprocessingprocess.py", line 105, in start
self._popen = self._Popen(self)
File "C:Pythonlibmultiprocessingcontext.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:Pythonlibmultiprocessingcontext.py", line 322, in _Popen
return Popen(process_obj)
File "C:Pythonlibmultiprocessingpopen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "C:Pythonlibmultiprocessingreduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe
====
=========所以我去这个网站查看了一些东西,并尝试了一个带有我从网站复制的以下代码的新脚本:
import mp
def worker(num):
"""thread worker function"""
print('Worker:' + num)
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = mp.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
但它同样不做任何事情。该网站说它应该打印Worker:0
Worker:1
等,但我没有得到打印。我的机器本地是否有问题?
在我看来,您只是尝试为每个CPU实例化一个新进程,并让它们运行您最初编写的相同功能,但是如果您想使用进程,则必须对其进行调整并处理进程同步。
例如,您可以有一个主进程,该进程提示用户赛季年份,获取该年份的所有游戏,然后子进程将从生成的数组中读取。请参阅以下示例:
# Parent Process
import multiprocessing as mp
# establish db connection [ ... ]
season = int(input("Year to simulate: "))
c.execute('SELECT * FROM gamelogs_' + season)
season_games = c.fetchall()
counter = mp.Value("i", 0)
lock = mp.Lock()
children = []
for i in range(os.cpu_count()):
children.append(mp.Process(target=simulate_games, args=(season_games, counter, lock,)))
for child in children:
child.start()
for child in children:
child.join()
# Child Process
def simulate_games(games_list, counter, lock):
while(1):
# acquire the lock which grants the access to the shared variable
with lock:
# check the termination condition
if counter.value >= len(games_list):
break
# get the game_num and the game to simulate
game_num = counter.value
game_to_simulate = games_list[counter.value]
# update counter for the next process
counter.value += 1
# Do simulation here
我们上面有一个父进程,它基本上是准备一些数据并创建新的子进程。
计数器通过一个特殊的类实现,即 Value,用于在进程之间共享标量值;Lock 基本上是一个互斥锁,我们用它来同步对计数器变量的访问并避免并发访问:请注意,您可以使用在计数器共享变量内部自动创建的 Lock,但我认为将两者分开会更容易理解。
孩子们通过首先获取锁、读取计数器值并递增它,然后继续它们的正常行为来处理,从而模拟游戏