为了加快训练神经网络的数据扩充速度,我正在尝试使用某种形式的并行处理来向GPU提供数据。目前的限制是我生成增强数据的速度,而不是GPU训练网络的速度。
如果我尝试将multiprocessing=True
与生成器一起使用,在Windows 10(v1083(64位下,Python 3.6.6中的keras 2.2.0会出现以下错误:
ValueError:使用带有
use_multiprocessing=True
的生成器不是在Windows上受支持(没有跨进程的生成器编组边界(。相反,请使用单线程/进程或多线程。
我在GitHub上发现了以下内容,因此这是Windows下keras的预期行为。该链接似乎建议移动到序列而不是生成器(尽管错误消息似乎建议使用多线程,但我也不知道如何将多线程与keras一起使用,而不是多处理-我可能在文档中忽略了它,但我就是没有找到它(。所以,我使用了下面的代码(使用序列修改示例(,但这也没有实现加速,或者在具有use_multiprocessing=True
的变体中只是冻结。
我是不是错过了一些关于如何让某种形式的并行生成器运行的明显内容?
最小(非(工作示例:
from keras.utils import Sequence
from keras.models import Sequential
from keras.layers import Dense
from keras.utils import to_categorical
import numpy as np
class DummySequence(Sequence):
def __init__(self, x_set, y_set, batch_size):
self.x, self.y = x_set, y_set
self.batch_size = batch_size
def __len__(self):
return int(np.ceil(len(self.x) / float(self.batch_size)))
def __getitem__(self, idx):
batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]
return np.array(batch_x), np.array(batch_y)
x = np.random.random((100, 3))
y = to_categorical(np.random.random(100) > .5).astype(int)
seq = DummySequence(x, y, 10)
model = Sequential()
model.add(Dense(32, input_dim=3))
model.add(Dense(2, activation='softmax'))
model.compile(optimizer='rmsprop',
loss='categorical_crossentropy',
metrics=['accuracy'])
print('single worker')
model.fit_generator(generator=seq,
steps_per_epoch = 100,
epochs = 2,
verbose=2,
workers=1)
print('achieves no speed-up')
model.fit_generator(generator=seq,
steps_per_epoch = 100,
epochs = 2,
verbose=2,
workers=6,
use_multiprocessing=False)
print('Does not run')
model.fit_generator(generator=seq,
steps_per_epoch = 100,
epochs = 2,
verbose=2,
workers=6,
use_multiprocessing=True)
与序列相结合,使用multi_prrocessing=False和workers=例如4即可工作。
我刚刚意识到,在问题中的示例代码中,我没有看到加速,因为数据生成得太快了。通过插入时间睡眠(2(,这一点变得显而易见。
class DummySequence(Sequence):
def __init__(self, x_set, y_set, batch_size):
self.x, self.y = x_set, y_set
self.batch_size = batch_size
def __len__(self):
return int(np.ceil(len(self.x) / float(self.batch_size)))
def __getitem__(self, idx):
batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]
time.sleep(2)
return np.array(batch_x), np.array(batch_y)
x = np.random.random((100, 3))
y = to_categorical(np.random.random(100) > .5).astype(int)
seq = DummySequence(x, y, 10)
model = Sequential()
model.add(Dense(32, input_dim=3))
model.add(Dense(2, activation='softmax'))
model.compile(optimizer='rmsprop',
loss='categorical_crossentropy',
metrics=['accuracy'])
print('single worker')
model.fit_generator(generator=seq,
steps_per_epoch = 10,
epochs = 2,
verbose=2,
workers=1)
print('achieves speed-up!')
model.fit_generator(generator=seq,
steps_per_epoch = 10,
epochs = 2,
verbose=2,
workers=4,
use_multiprocessing=False)
这在我的笔记本电脑上产生了以下内容:
single worker
>>> model.fit_generator(generator=seq,
... steps_per_epoch = 10,
... epochs = 2,
... verbose=2,
... workers=1)
Epoch 1/2
- 20s - loss: 0.6984 - acc: 0.5000
Epoch 2/2
- 20s - loss: 0.6955 - acc: 0.5100
和
achieves speed-up!
>>> model.fit_generator(generator=seq,
... steps_per_epoch = 10,
... epochs = 2,
... verbose=2,
... workers=4,
... use_multiprocessing=False)
Epoch 1/2
- 6s - loss: 0.6904 - acc: 0.5200
Epoch 2/2
- 6s - loss: 0.6900 - acc: 0.5000
重要提示:您可能希望__init___
中包含self.lock = threading.Lock()
,然后__getitem__
中包含with self.lock:
。试着在with self.lock:
中执行所需的绝对最小值,据我所知,这将是对self.xxxx
的任何引用(在with self.lock:
块运行时防止多线程(。
此外,如果您希望多线程加速计算(即CPU操作是极限(,请不要期望任何加速。全局解释器锁定(GIL(将防止这种情况发生。只有当I/O操作受到限制时,多线程才会对您有所帮助。显然,为了加速CPU计算,我们需要真正的多处理,而keras
目前在Windows 10上不支持这种处理。也许手工制作一个多处理生成器是可能的(我不知道(。
我在我的GPU/CPU监控解决方案中测试了您的提案。
- 在我的情况下,速度增加了约10%(440秒对550秒(
- CPU一次只使用一个内核。GPU负载不超过22%
看起来一个核心以更高效的方式运行,分配了更多的工作人员。但是,没有启用真正的多处理。
TF 2.0
Keras 2.2.4