在python中使用具有并行性的Pygame



我正在尝试训练一个神经网络来玩使用Pygame制作的SMB1游戏。为了加快速度,我想使用并行处理,以便由不同的群体成员(在不同的训练数据上)同时玩多个游戏副本。

我的问题的根源在于Pygame本质上不是基于实例的;也就是说,它将只生成一个具有一个显示对象的窗口。因为我不能为每个进程创建多个Pygame窗口和显示对象,所以这些进程必须共享一个显示对象。这让我想到了我的第一个问题:有没有办法拥有多个pygame实例,如果没有,有没有一种(性能灯)方法可以同时绘制在显示器上?也就是说,每个游戏都会吸引到整个窗口的不同部分。

然而,我并不真的需要每一个游戏都被渲染;我只关心至少渲染一个游戏实例,这样我就可以监控它的进度。然后,我的解决方案是为每个游戏分配一个进程id,只有进程id为0的游戏才会真正显示出来。并发问题已解决!为了实现这一点,我使用了多处理。流程:

processes = [];
genome_batches = np.array_split(genomes,self.runConfig.parallel_processes);
for i in range(runConfig.parallel_processes):
process = multiprocessing.Process(target=self.eval_genome_batch_feedforward,args=(genome_batches[i],config,i));
processes.append(process);
for process in processes:
process.start();
for process in processes:
process.join();

然而,当多处理pickle对象时,这本身就造成了一个问题:AttributeError: Can't pickle local object 'RunnerConfig.__init__.<locals>.<lambda>'注意:config和RunnerConfig是两个不同的东西;一个来自我正在使用的整洁库,它是传递给函数的配置,另一个是我自己的类,它是启动进程的类的属性。

经过一些研究,似乎是因为我使用了一个类方法,所以多处理pickle了这个类,其中包括上面的RunnerConfig,它包含lambda函数,这些函数是不可pickle的。这很难解决,因为这些lambda函数在self.eval_genome_batch中专门用于。这就引出了第二个问题:是否可以使用多处理。以一种不需要对外部类进行pickle的方式进行处理,这样lambda函数就不会被pickle了?

最后,经过更多的研究,事实证明,我可以使用pathos.multiprocessing,而不是使用使用pickle的多处理。Dill可以pickle lambda函数。好极了

但没有。还有最后一次。Pathos.multiprocessing只有来自multiprocessing的.map和.map等效函数,这不允许我控制进程本身。这意味着,当调用该函数时,没有办法(afaik)告诉程序游戏是从哪个进程id运行的,因此也没有办法告诉程序是否渲染到屏幕上。最后一个问题是:有没有一种方法可以使用pathos.multiprocessing.map(或者,实际上,任何库并行函数),使a)不与lambda函数中断,b)可以告诉被调用的函数正在使用哪个进程id?

最后一点:我知道最简单的答案就是不要渲染到Pygame。这将解决所有问题。然而,能够看到程序的进展和学习对我来说是非常有用和重要的

所以,我有一个不同问题的列表,其中任何一个问题,如果解决了,都会解决所有问题:

  • 一种将pygame用作不同线程中的多个不同实例的方法,这些实例源自同一进程
  • 一种安全地与pygame的显示器(和更新时钟)同时工作的方法
  • 一种使用多处理的方法。处理,这样它就不需要pickle方法的类,但仍然可以访问类变量
  • 多处理库,该库:
    • 要么不需要pickle lambda函数,要么能够
    • 有一种方法可以告诉子流程正在使用哪个流程工作者

EDIT:以下是大部分相关代码。只包括了相关的方法,因为类相当长。如果你愿意,可以在我的github 上找到源代码

game_runner_neat.py:启动并行处理的类

import neat
import baseGame
import runnerConfiguration
import os.path
import os
import visualize
import random
import numpy as np
#import concurrent.futures
import multiprocessing
from logReporting import LoggingReporter
from renderer import Renderer as RendererReporter
from videofig import videofig as vidfig
from neat.six_util import iteritems, itervalues
class GameRunner:
#if using default version, create basic runner and specify game to run
def __init__(self,game,runnerConfig):
self.game = game;
self.runConfig = runnerConfig;
#skip some code

#parallel version of eval_genomes_feedforward
def eval_genome_batch_feedforward(self,genomes,config,processNum):
for genome_id, genome in genomes:
genome.fitness += self.eval_genome_feedforward(genome,config,processNum=processNum);

def eval_training_data_batch_feedforward(self,genomes,config,data,processNum,lock):
for datum in data:
for genome_id,genome in genomes:
genome.increment_fitness(lock,self.eval_genome_feedforward(genome,config,processNum=processNum,trainingDatum=datum)); #increment_fitness allows multiple threads to change the fitness of the same genome safely
#evaluate a population with the game as a feedforward neural net
def eval_genomes_feedforward(self, genomes, config):
for genome_id,genome in genomes:
genome.fitness = 0; #sanity check
if (self.runConfig.training_data is None):
if (self.runConfig.parallel):
processes = [];
genome_batches = np.array_split(genomes,self.runConfig.parallel_processes);
for i in range(runConfig.parallel_processes):
process = multiprocessing.Process(target=self.eval_genome_batch_feedforward,args=(genome_batches[i],config,i));
processes.append(process);
for process in processes:
process.start();
for process in processes:
process.join();
return;
else:
for genome_id, genome in genomes:
genome.fitness += self.eval_genome_feedforward(genome,config)
else:
if (self.runConfig.parallel):
processes = [];
data_batches = np.array_split(self.runConfig.training_data,self.runConfig.parallel_processes);
lock = multiprocessing.Lock();
for i in range(self.runConfig.parallel_processes):
process = multiprocessing.Process(target=self.eval_training_data_batch_feedforward,args=(genomes,config,data_batches[i],i,lock));
processes.append(process);
process.start();
for process in processes:
process.join();
return;
else:
for datum in self.runConfig.training_data:
for genome_id, genome in genomes:
genome.fitness += self.eval_genome_feedforward(genome,config,trainingDatum=datum)

runnerConfiguration.py(带有lambda函数的类,在init中传递给GameRunner):

class RunnerConfig:
def __init__(self,gameFitnessFunction,gameRunningFunction,logging=False,logPath='',recurrent=False,trial_fitness_aggregation='average',custom_fitness_aggregation=None,time_step=0.05,num_trials=10,parallel=False,returnData=[],gameName='game',num_generations=300,fitness_collection_type=None):
self.logging = logging;
self.logPath = logPath;
self.generations = num_generations;
self.recurrent = recurrent;
self.gameName = gameName;
self.parallel = parallel;
self.time_step = time_step;
self.numTrials = num_trials;
self.fitnessFromGameData = gameFitnessFunction;
self.gameStillRunning = gameRunningFunction;
self.fitness_collection_type = fitness_collection_type;
self.returnData = returnData;
##        for (datum in returnData):
##            if (isinstance(datum,IOData)):
##                [returnData.append(x) for x in datum.getSplitData()];
##            else:
##                returnData.append(datum);
##        
if (trial_fitness_aggregation == 'custom'):
self.fitnessFromArray = custom_fitness_aggregation;
if (trial_fitness_aggregation == 'average'):
self.fitnessFromArray = lambda fitnesses : sum(fitnesses)/len(fitnesses);
if (trial_fitness_aggregation == 'max'):
self.fitnessFromArray = lambda fitnesses : max(fitnesses);
if (trial_fitness_aggregation == 'min'):
self.fitnessFromArray = lambda fitnesses : min(fitnesses);

gameFitnessFunction和gameRunningFunction是为自定义训练行为而传递的函数。

当程序尝试在runnerConfig.parallel=True的情况下运行eval_genemes_feedforward时,我得到以下完整的错误消息:

Traceback (most recent call last):
File "c:/Users/harrison_truscott/Documents/GitHub/AI_game_router/Neat/smb1Py_runner.py", line 94, in <module>
winner = runner.run(config,'run_' + str(currentRun));
File "c:Usersharrison_truscottDocumentsGitHubAI_game_routerNeatgame_runner_neat.py", line 75, in run
winner = pop.run(self.eval_genomes,self.runConfig.generations);
File "c:Usersharrison_truscottDocumentsGitHubAI_game_routerNeatneatpopulation.py", line 102, in run
fitness_function(list(iteritems(self.population)), self.config)
File "c:Usersharrison_truscottDocumentsGitHubAI_game_routerNeatgame_runner_neat.py", line 204, in eval_genomes
self.eval_genomes_feedforward(genomes,config);
File "c:Usersharrison_truscottDocumentsGitHubAI_game_routerNeatgame_runner_neat.py", line 276, in eval_genomes_feedforward
process.start();
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingprocess.py", line 121, in start
self._popen = self._Popen(self)
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingcontext.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingcontext.py", line 326, in _Popen
return Popen(process_obj)
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingpopen_spawn_win32.py", line 93, in __init__
reduction.dump(process_obj, to_child)
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingreduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'RunnerConfig.__init__.<locals>.<lambda>'

当第一个进程中断时,当下一个进程被第一个进程的不完整启动中断时,我会收到第二条错误消息:

Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingspawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingspawn.py", line 125, in _main
prepare(preparation_data)
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingspawn.py", line 236, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingspawn.py", line 287, in _fixup_main_from_path
main_content = runpy.run_path(main_path,
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38librunpy.py", line 263, in run_path
return _run_module_code(code, init_globals, run_name,
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38librunpy.py", line 96, in _run_module_code
_run_code(code, mod_globals, init_globals,
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38librunpy.py", line 86, in _run_code
exec(code, run_globals)
File "c:Usersharrison_truscottDocumentsGitHubAI_game_routerNeatsmb1Py_runner.py", line 94, in <module>
winner = runner.run(config,'run_' + str(currentRun));
File "c:Usersharrison_truscottDocumentsGitHubAI_game_routerNeatgame_runner_neat.py", line 75, in run
winner = pop.run(self.eval_genomes,self.runConfig.generations);
File "c:Usersharrison_truscottDocumentsGitHubAI_game_routerNeatneatpopulation.py", line 102, in run
fitness_function(list(iteritems(self.population)), self.config)
File "c:Usersharrison_truscottDocumentsGitHubAI_game_routerNeatgame_runner_neat.py", line 204, in eval_genomes
self.eval_genomes_feedforward(genomes,config);
File "c:Usersharrison_truscottDocumentsGitHubAI_game_routerNeatgame_runner_neat.py", line 276, in eval_genomes_feedforward
process.start();
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingprocess.py", line 121, in start
self._popen = self._Popen(self)
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingcontext.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingcontext.py", line 326, in _Popen
return Popen(process_obj)
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingpopen_spawn_win32.py", line 45, in __init__
prep_data = spawn.get_preparation_data(process_obj._name)
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingspawn.py", line 154, in get_preparation_data
_check_not_importing_main()
File "C:Usersharrison_truscottAppDataLocalProgramsPythonPython38libmultiprocessingspawn.py", line 134, in _check_not_importing_main
raise RuntimeError('''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.

顺便说一句,multiprocessing.freeze_support()是我在运行的主文件中调用的第一个函数。

我将尝试解决主要问题。我对您实际问题的理解非常有限,因为我不知道您的代码实际上是做什么的。

"一种将pygame用作从同一进程派生的不同线程中的多个不同实例的方法">

这不起作用,因为pygame是建立在SDL2上的;您不应该期望能够在主线程之外的任何线程上创建窗口、渲染或接收事件">

"一种与pygame的显示器(和更新时钟)安全地同时工作的方式">

和上面一样,显示器只在主线程中工作。

"一种使用多处理的方法。处理,使得它不需要pickle方法的类,但仍然可以访问类变量">

您可以使用dill之类的方法来pickle这些方法,但在进程之间完全复制python对象(对我来说)是错误的。我会选择另一种解决方案。

"多处理库,其:

1.要么不需要pickle lambda函数,要么能够

您需要使用来序列化Python对象,以便在进程之间发送它们。

2.有一种方法可以告诉子流程正在使用哪个流程工作者

我不明白这是什么意思。


在我看来,这个问题可以通过更好地分离数据和可视化来解决。培训人员不应该了解任何可视化,因为它不取决于您想要如何显示它。因此,不应该有任何理由共享pygame显示。

一旦完成了这一点,做你想做的事情应该不会有太大的问题(多线程总是会导致问题);我尽量避免对Python对象和函数进行酸洗,而只是在线程和进程之间传递基本的基元。似乎您应该能够用一个简单的int来分配self.fitnessFromArray,并根据其值在线程/进程中进行min/agg/max计算。

如果要执行线程处理,那么主线程将负责渲染。它还将为训练生成线程。当线程完成时,它们将返回结果(或将其放入线程安全存储中),主线程将轮询数据并显示结果。如果训练完成的工作需要比一帧更长的时间,那么将工作分开,使每个线程只进行部分训练,并可以在下一帧结束的地方继续。

如果您想要单独的流程,则主体是相同的。主流程启动几个培训流程,并通过套接字连接到这些流程。从套接字中,您可以轮询有关程序状态的信息并显示它。它基本上是一个客户端-服务器体系结构(尽管在localhost上),其中训练脚本是服务器,主进程是客户端。

最新更新