在训练我的模型时,我遇到了post Tensorflow - Keras中描述的问题:考虑关闭自动分片或将auto_shard_policy切换为DATA来对该数据集进行分片。我现在的问题是:@Graham501617提到的解决方案是否也适用于发电机?下面是我到目前为止使用的一些虚拟代码:
class BatchGenerator(Sequence):
def __init__(self, some_args):
...
def __len__(self):
num_batches_in_sequence = ...
def __getitem__(self, _):
data, labels = get_one_batch(self.some_args)
return data, labels
在主脚本中,我做了如下操作:
train_generator = BatchGenerator(some_args)
valid_generator = BatchGenerator(some_args)
cross_device_ops = tf.distribute.HierarchicalCopyAllReduce(num_packs=2)
strategy = tf.distribute.MirroredStrategy(cross_device_ops=cross_device_ops)
with strategy.scope():
model = some_model
model.compile(some_args)
history = model.fit(
x=train_generator,
validation_data=valid_generator,
...
)
我可能必须以某种方式修改__getitem__
函数,对吗?
您必须将生成器包装成单个函数…
下面的示例假设您的数据存储为numpy数组(.npy
),每个文件已经具有正确的迷你批大小,标记为0_x.npy
,1_x.npy
,2_x.npy
等。数据数组和标签数组都是float64
。
from pathlib import Path
import tensorflow as tf
import numpy as np
# Your new generator as a function rather than an object you need to instantiate
def getNextBatch(stop, data_dir):
i = 0
data_dir = data_dir.decode('ascii')
while True:
while i < stop:
x = np.load(str(Path(data_dir + "/" + str(i) + "_x.npy")))
y = np.load(str(Path(data_dir + "/" + str(i) + "_y.npy")))
yield x, y
i += 1
i = 0
# Make a dataset given the directory and strategy
def makeDataset(generator_func, dir, strategy=None):
# Get amount of files
data_size = int(len([name for name in os.listdir(dir) if os.path.isfile(os.path.join(dir, name))])/2)
ds = tf.data.Dataset.from_generator(generator_func, args=[data_size, dir], output_types=(tf.float64, tf.float64)) # Make a dataset from the generator. MAKE SURE TO SPECIFY THE DATA TYPE!!!
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
ds = ds.with_options(options)
# Optional: Make it a distributed dataset if you're using a strategy
if strategy is not None:
ds = strategy.experimental_distribute_dataset(ds)
return ds
training_ds = makeDataset(getNextBatch, str(Path(data_dir + "/training")), None)
validation_ds = makeDataset(getNextBatch, str(Path(data_dir + "/validation")), None)
model.fit(training_ds,
epochs=epochs,
callbacks=callbacks,
validation_data=validation_ds)
您可能需要在fit()
调用中传递每个epoch的步数,在这种情况下,您可以使用您已经制作的生成器。