如何确保每个关键点都在所有Beam分区中表示



我正在组装一个python Apache Beam管道,以便在Dataflow上运行。该管道组装TensorFlowTFExamplesPCollection,最终使用内置的tfrecordioAPI保存到TFRecords中。

我想将我的数据分为三部分:训练(80%(、验证(10%(和测试(10%(。理想情况下,我希望能够为每个标签循环分配元素,这样就可以强制执行特定的拆分,而不是让随机机会。实现这一点的非波束代码看起来像:

distribution_schedule = [1, 2, 0, 0, 0, 0, 0, 0, 0, 0] # train = 0, valid = 1, test = 2
class ExampleDistributor():
def __init__(self):
self.examples_per_label = {}
def assign(label_example_pair):
label_examples = self.examples_per_label.get(label_example_pair[0], 0)
return = distribution_schedule[label_examples % len(distribution_schedule)]
self.examples_per_label[label] = label_examples + 1

这将在每个标签的基础上,将第一个元素分配给验证,第二个分配给测试,第三个到第十个分配给训练,然后重复,直到筋疲力尽。如果我只有三个例子(不现实,但只是举个例子(,那么每个分区都会得到一个。或者,如果我有999个例子,那么100个将进行验证,100个进行测试,799个进行培训。这就是谷歌的AutoML服务分发示例的方式,我也在尝试这样做。这也允许我更改时间表以强制执行不同的分发。

正如我所说,这是我理想的情况。或者,我也会满足于使用随机分配的机制(就像大多数情况一样(,但保证每个分区至少接收一个示例

我知道简单的解决方案是在分区转换中使用一个简单的随机化器lambda,但我不喜欢一个组总是有机会——无论它可能多么小——一个标签可以有零个例子。

如何确保每个分区至少有每个标签的一个示例?此外,如果波束是可能的,我如何进行循环波束变换?我想这可能涉及到GroupByKey转换,以按标签对示例进行分组,但在那之后,我如何确保每个分区都有示例?

对于任何大小合理的数据集,随机分区基本上都不会导致空分区。考虑到这一点,最简单的做法是添加一个检查,检查每个分区是否至少有一个元素(使用随机分区(,如果没有,则使管道失败。类似的东西

partitions = input
| 'Partition' >> beam.Partition(...)
def checkNonZero(n):
if n == 0:
raise Exception('...')
return n
for partition in partitions:
partition
| 'Count' >> beam.combiners.Count.Globally()
| 'Check non-zero' >> beam.Map(checkNonZero)
# process partitions normally here

最新更新