Tensorflow 数据集 API 中的过采样功能


我想

问一下数据集的当前API是否允许实现过采样算法?我处理高度不平衡的阶级问题。我在想,在数据集解析(即在线生成(期间对特定类进行过采样会很好。我已经看到了rejection_resample函数的实现,但是这会删除样本而不是复制它们,并且会减慢批处理生成速度(当目标分布与初始分布有很大不同时(。我想实现的是:举个例子,看它的类概率决定是否重复。然后调用dataset.shuffle(...) dataset.batch(...)并获取迭代器。最好的(在我看来(方法是对低可能的类进行过采样,并对最可能的类进行子采样。我想在线进行,因为它更灵活。

此问题已在问题 #14451 中解决。只需在此处发布 anwser,使其对其他开发人员更可见。

示例代码是低频率类的过采样和高频率类的过采样,在我的例子中class_target_prob只是均匀分布。我想检查最近手稿中的一些结论 卷积神经网络中类不平衡问题的系统研究

特定类的过采样是通过调用以下项来完成的:

dataset = dataset.flat_map(
    lambda x: tf.data.Dataset.from_tensors(x).repeat(oversample_classes(x))
)

这是完成所有操作的完整代码片段:

# sampling parameters
oversampling_coef = 0.9  # if equal to 0 then oversample_classes() always returns 1
undersampling_coef = 0.5  # if equal to 0 then undersampling_filter() always returns True
def oversample_classes(example):
    """
    Returns the number of copies of given example
    """
    class_prob = example['class_prob']
    class_target_prob = example['class_target_prob']
    prob_ratio = tf.cast(class_target_prob/class_prob, dtype=tf.float32)
    # soften ratio is oversampling_coef==0 we recover original distribution
    prob_ratio = prob_ratio ** oversampling_coef 
    # for classes with probability higher than class_target_prob we
    # want to return 1
    prob_ratio = tf.maximum(prob_ratio, 1) 
    # for low probability classes this number will be very large
    repeat_count = tf.floor(prob_ratio)
    # prob_ratio can be e.g 1.9 which means that there is still 90%
    # of change that we should return 2 instead of 1
    repeat_residual = prob_ratio - repeat_count # a number between 0-1
    residual_acceptance = tf.less_equal(
                        tf.random_uniform([], dtype=tf.float32), repeat_residual
    )
    residual_acceptance = tf.cast(residual_acceptance, tf.int64)
    repeat_count = tf.cast(repeat_count, dtype=tf.int64)
    return repeat_count + residual_acceptance

def undersampling_filter(example):
    """
    Computes if given example is rejected or not.
    """
    class_prob = example['class_prob']
    class_target_prob = example['class_target_prob']
    prob_ratio = tf.cast(class_target_prob/class_prob, dtype=tf.float32)
    prob_ratio = prob_ratio ** undersampling_coef
    prob_ratio = tf.minimum(prob_ratio, 1.0)
    acceptance = tf.less_equal(tf.random_uniform([], dtype=tf.float32), prob_ratio)
    return acceptance

dataset = dataset.flat_map(
    lambda x: tf.data.Dataset.from_tensors(x).repeat(oversample_classes(x))
)
dataset = dataset.filter(undersampling_filter)
dataset = dataset.repeat(-1)
dataset = dataset.shuffle(2048)
dataset = dataset.batch(32)
sess.run(tf.global_variables_initializer())
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

更新 #1

这是一个简单的jupyter笔记本,它在玩具模型上实现了上述过采样/欠采样。

tf.data.experimental.rejection_resample似乎

是一个更好的方法,因为它不需要"class_prob"和"class_target_prob"功能。
尽管它是欠采样而不是过度采样,但具有相同的目标分布和训练步骤,但它的工作原理相同。

这个QnA对我很有帮助。所以我用我的相关经验写了一篇关于它的博客文章。

https://vallum.github.io/Optimizing_parallel_performance_of_resampling_with_tensorflow.html

我希望对带有

重采样的 Tensorflow 输入管道优化感兴趣的人可以从中得到一些想法。

一些操作可能是不必要的冗余,但在我个人的情况下并不是太大的性能下降者。

 dataset = dataset.map(undersample_filter_fn, num_parallel_calls=num_parallel_calls) 
 dataset = dataset.flat_map(lambda x : x) 

使用身份 lambda 函数flat_map仅用于合并幸存(和空(记录

# Pseudo-code for understanding of flat_map after maps
#parallel calls of map('A'), map('B'), and map('C')
map('A') = 'AAAAA' # replication of A 5 times
map('B') = ''      # B is dropped
map('C') = 'CC'    # replication of C twice
# merging all map results
flat_map('AAAA,,CC') = 'AAAACC'

经过考验和磨难,这里是二元情况的粗略解决方案。我按标签将数据集一分为二,然后打乱并重复较小的子数据集,并将它们连接在一起,仅根据需要获取尽可能多的样本。因此,对于具有x,y属性的数据集:

def _ds_is_batched(ds: tf.data.Dataset):
    # returns True iff the dataset is batched
    return hasattr(ds, '_batch_size') and ds._batch_size.numpy() > 1
def _get_ds_len(ds: tf.data.Dataset):
    # returns number of samples in the dataset
    n = 0
    _is_batched = _ds_is_batched(ds)
    for _ in ds:
        n += _[0].numpy().shape[0] if _is_batched else 1
    return n
def balance_ds(ds: tf.data.Dataset, shuffle_bs: int = 1000):
    # returns a new dataset with oversampling of the less-frequent class
    ds0 = ds.filter(lambda x, y, *args: tf.math.equal(y, 0))
    ds1 = ds.filter(lambda x, y, *args: tf.math.equal(y, 1))
    n0, n1 = _get_ds_len(ds0), _get_ds_len(ds1)
    # shuffle and repeat the less frequent class in order to oversample it
    if n0 < n1:
        ds0 = ds0.shuffle(shuffle_bs).repeat(int(np.ceil(n1/n0))).take(n1)
    else:
        ds1 = ds1.shuffle(shuffle_bs).repeat(int(np.ceil(n0 / n1))).take(n0)
    res = tf.data.experimental.sample_from_datasets([ds0, ds1])
    res = res.shuffle(shuffle_bs)
    return res

最新更新